You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2020/03/28 22:46:16 UTC
[druid] branch master updated: Fix HDFS input source split (#9574)
This is an automated email from the ASF dual-hosted git repository.
fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c0195a1 Fix HDFS input source split (#9574)
c0195a1 is described below
commit c0195a19e4e9bef50dd1bb5f1b63d03b4f3e318f
Author: Chi Cao Minh <ch...@imply.io>
AuthorDate: Sat Mar 28 15:45:57 2020 -0700
Fix HDFS input source split (#9574)
Fixes an issue where splitting an HDFS input source for use in native
parallel batch ingestion would cause the subtasks to get a split with an
invalid HDFS path.
---
.../apache/druid/inputsource/hdfs/HdfsInputSource.java | 7 +++++--
.../druid/inputsource/hdfs/HdfsInputSourceTest.java | 16 ++++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
index be7f3c8..b8c798a 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java
@@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
@@ -142,8 +143,9 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
}
}
+ @VisibleForTesting
@JsonProperty(PROP_PATHS)
- private List<String> getInputPaths()
+ List<String> getInputPaths()
{
return inputPaths;
}
@@ -199,7 +201,8 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
@Override
public SplittableInputSource<List<Path>> withSplit(InputSplit<List<Path>> split)
{
- return new HdfsInputSource(split.get().toString(), configuration);
+ List<String> paths = split.get().stream().map(path -> path.toString()).collect(Collectors.toList());
+ return new HdfsInputSource(paths, configuration);
}
@Override
diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
index d2c7820..044930f 100644
--- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
+++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.inputsource.hdfs;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
@@ -277,6 +278,21 @@ public class HdfsInputSourceTest extends InitializedNullHandlingTest
int numSplits = target.estimateNumSplits(null, new MaxSizeSplitHintSpec(1L));
Assert.assertEquals(NUM_FILE, numSplits);
}
+
+ @Test
+ public void createCorrectInputSourceWithSplit() throws Exception
+ {
+ // Set maxSplitSize to 1 so that each inputSplit has only one object
+ List<InputSplit<List<Path>>> splits = target.createSplits(null, new MaxSizeSplitHintSpec(1L))
+ .collect(Collectors.toList());
+
+ for (InputSplit<List<Path>> split : splits) {
+ String expectedPath = Iterables.getOnlyElement(split.get()).toString();
+ HdfsInputSource inputSource = (HdfsInputSource) target.withSplit(split);
+ String actualPath = Iterables.getOnlyElement(inputSource.getInputPaths());
+ Assert.assertEquals(expectedPath, actualPath);
+ }
+ }
}
public static class EmptyPathsTest
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org