You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/03/28 22:46:16 UTC

[druid] branch master updated: Fix HDFS input source split (#9574)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c0195a1  Fix HDFS input source split (#9574)
c0195a1 is described below

commit c0195a19e4e9bef50dd1bb5f1b63d03b4f3e318f
Author: Chi Cao Minh <ch...@imply.io>
AuthorDate: Sat Mar 28 15:45:57 2020 -0700

    Fix HDFS input source split (#9574)
    
    Fixes an issue where splitting an HDFS input source for use in native
    parallel batch ingestion would cause the subtasks to get a split with an
    invalid HDFS path.
---
 .../apache/druid/inputsource/hdfs/HdfsInputSource.java   |  7 +++++--
 .../druid/inputsource/hdfs/HdfsInputSourceTest.java      | 16 ++++++++++++++++
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index be7f3c8..b8c798a 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs;
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
@@ -142,8 +143,9 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
     }
   }
 
+  @VisibleForTesting
   @JsonProperty(PROP_PATHS)
-  private List<String> getInputPaths()
+  List<String> getInputPaths()
   {
     return inputPaths;
   }
@@ -199,7 +201,8 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
   @Override
   public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split)
   {
-    return new HdfsInputSource(split.get().toString(), configuration);
+    List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList());
+    return new HdfsInputSource(paths, configuration);
   }
 
   @Override
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index d2c7820..044930f 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
 import org.apache.druid.data.input.InputFormat;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
@@ -277,6 +278,21 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
       int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L));
       Assert.assertEquals(NUM_FILE, numSplits);
     }
+
+    @Test
+    public void createCorrectInputSourceWithSplit() throws Exception
+    {
+      // Set maxSplitSize to 1 so that each inputSplit has only one object
+      List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L))
+                                                  .collect(Collectors.toList());
+
+      for (InputSplit<List<Path>> split : splits) {
+        String expectedPath = Iterables.getOnlyElement(split.get()).toString();
+        HdfsInputSource inputSource = (HdfsInputSource) target.withSplit(split);
+        String actualPath = Iterables.getOnlyElement(inputSource.getInputPaths());
+        Assert.assertEquals(expectedPath, actualPath);
+      }
+    }
   }
 
   public static class EmptyPathsTest


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org