You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/12/19 00:18:12 UTC

[incubator-druid] branch 0.17.0-incubating updated: Fail superbatch range partition multi dim values (#9058) (#9070)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.17.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.17.0-incubating by this push:
     new c092da0  Fail superbatch range partition multi dim values (#9058) (#9070)
c092da0 is described below

commit c092da060e15b2bd609085d56607913e528c0127
Author: Chi Cao Minh <ch...@imply.io>
AuthorDate: Wed Dec 18 16:17:55 2019 -0800

    Fail superbatch range partition multi dim values (#9058) (#9070)
    
    * Fail superbatch range partition multi dim values
    
    Change the behavior of parallel indexing range partitioning to fail
    ingestion if any row had multiple values for the partition dimension.
    After this change, the behavior matches that of hadoop indexing.
    (Previously, rows with multiple dimension values would be skipped.)
    
    * Improve err msg, rename method, rename test class
---
 docs/ingestion/native-batch.md                     |  2 +-
 ...ePartitionIndexTaskInputRowIteratorBuilder.java | 22 ++++++++++---
 .../PartialDimensionDistributionTaskTest.java      |  8 ++---
 ...itionIndexTaskInputRowIteratorBuilderTest.java} | 38 ++++++++++++++++++++--
 4 files changed, 58 insertions(+), 12 deletions(-)

diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md
index 8934326..124c49f 100644
--- a/docs/ingestion/native-batch.md
+++ b/docs/ingestion/native-batch.md
@@ -268,7 +268,7 @@ The three `partitionsSpec` types have different pros and cons:
 |property|description|default|required?|
 |--------|-----------|-------|---------|
 |type|This should always be `single_dim`|none|yes|
-|partitionDimension|The dimension to partition on. Only rows with a single dimension value will be included.|none|yes|
+|partitionDimension|The dimension to partition on. Only rows with a single dimension value are allowed.|none|yes|
 |targetRowsPerSegment|Target number of rows to include in a partition, should be a number that targets segments of 500MB\~1GB.|none|either this or `maxRowsPerSegment`|
 |maxRowsPerSegment|Maximum number of rows to include in a partition. Defaults to 50% larger than the `targetRowsPerSegment`.|none|either this or `targetRowsPerSegment`|
 |assumeGrouped|Assume that input data has already been grouped on time and dimensions. Ingestion will run faster, but may choose sub-optimal partitions if this assumption is violated.|false|no|
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
index 4373af4..171d6b8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilder.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.task.batch.parallel.iterator;
 import org.apache.druid.data.input.HandlingInputRowIterator;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.indexing.common.task.IndexTask;
+import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.java.util.common.parsers.ParseException;
 import org.apache.druid.segment.indexing.granularity.GranularitySpec;
@@ -104,8 +105,8 @@ public class RangePartitionIndexTaskInputRowIteratorBuilder implements IndexTask
   )
   {
     return inputRow -> {
-      List<String> dimensionValues = inputRow.getDimension(partitionDimension);
-      return dimensionValues.size() != 1;
+      int dimensionValueCount = getSingleOrNullDimensionValueCount(inputRow, partitionDimension);
+      return dimensionValueCount != 1;
     };
   }
 
@@ -114,9 +115,22 @@ public class RangePartitionIndexTaskInputRowIteratorBuilder implements IndexTask
   )
   {
     return inputRow -> {
-      List<String> dimensionValues = inputRow.getDimension(partitionDimension);
-      return dimensionValues.size() > 1;  // Rows.objectToStrings() returns an empty list for a single null value
+      int dimensionValueCount = getSingleOrNullDimensionValueCount(inputRow, partitionDimension);
+      return dimensionValueCount > 1;  // Rows.objectToStrings() returns an empty list for a single null value
     };
   }
 
+  private static int getSingleOrNullDimensionValueCount(InputRow inputRow, String partitionDimension)
+  {
+    List<String> dimensionValues = inputRow.getDimension(partitionDimension);
+    int dimensionValueCount = dimensionValues.size();
+    if (dimensionValueCount > 1) {
+      throw new IAE(
+          "Cannot partition on multi-value dimension [%s] for input row [%s]",
+          partitionDimension,
+          inputRow
+      );
+    }
+    return dimensionValueCount;
+  }
 }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
index 5d905f0..ac7dbba 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTaskTest.java
@@ -237,7 +237,7 @@ public class PartialDimensionDistributionTaskTest
     }
 
     @Test
-    public void skipsRowsWithMultipleDimensionValues()
+    public void failsIfRowHasMultipleDimensionValues()
     {
       InputSource inlineInputSource = new InlineInputSource(
           ParallelIndexTestingFactory.createRow(0, Arrays.asList("a", "b"))
@@ -245,10 +245,10 @@ public class PartialDimensionDistributionTaskTest
       PartialDimensionDistributionTaskBuilder taskBuilder = new PartialDimensionDistributionTaskBuilder()
           .inputSource(inlineInputSource);
 
-      DimensionDistributionReport report = runTask(taskBuilder);
+      exception.expect(RuntimeException.class);
+      exception.expectMessage("Cannot partition on multi-value dimension [dim]");
 
-      Map<Interval, StringDistribution> intervalToDistribution = report.getIntervalToDistribution();
-      Assert.assertEquals(0, intervalToDistribution.size());
+      runTask(taskBuilder);
     }
 
     @Test
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionTaskInputRowIteratorBuilderTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java
similarity index 85%
rename from indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionTaskInputRowIteratorBuilderTest.java
rename to indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java
index 719535c..da53ba2 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionTaskInputRowIteratorBuilderTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/iterator/RangePartitionIndexTaskInputRowIteratorBuilderTest.java
@@ -25,13 +25,15 @@ import org.apache.druid.segment.indexing.granularity.GranularitySpec;
 import org.hamcrest.Matchers;
 import org.joda.time.DateTime;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-public class RangePartitionTaskInputRowIteratorBuilderTest
+public class RangePartitionIndexTaskInputRowIteratorBuilderTest
 {
   private static final boolean SKIP_NULL = true;
   private static final IndexTaskInputRowIteratorBuilderTestingFactory.HandlerTester HANDLER_TESTER =
@@ -43,14 +45,17 @@ public class RangePartitionTaskInputRowIteratorBuilderTest
       );
   private static final InputRow NO_NEXT_INPUT_ROW = null;
 
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
   @Test
   public void invokesDimensionValueCountFilterLast()
   {
     DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;
-    List<String> multipleDimensionValues = Arrays.asList("multiple", "dimension", "values");
+    List<String> nullDimensionValue = Collections.emptyList();  // Rows.objectToStrings() returns empty list for null
     InputRow inputRow = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRow(
         timestamp,
-        multipleDimensionValues
+        nullDimensionValue
     );
     CloseableIterator<InputRow> inputRowIterator = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(
         inputRow
@@ -78,6 +83,33 @@ public class RangePartitionTaskInputRowIteratorBuilderTest
   }
 
   @Test
+  public void throwsExceptionIfMultipleDimensionValues()
+  {
+    DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;
+    List<String> multipleDimensionValues = Arrays.asList("multiple", "dimension", "values");
+    InputRow inputRow = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRow(
+        timestamp,
+        multipleDimensionValues
+    );
+    CloseableIterator<InputRow> inputRowIterator = IndexTaskInputRowIteratorBuilderTestingFactory.createInputRowIterator(
+        inputRow
+    );
+    GranularitySpec granularitySpec = IndexTaskInputRowIteratorBuilderTestingFactory.createGranularitySpec(
+        timestamp,
+        IndexTaskInputRowIteratorBuilderTestingFactory.PRESENT_BUCKET_INTERVAL_OPT
+    );
+
+    exception.expect(IllegalArgumentException.class);
+    exception.expectMessage("Cannot partition on multi-value dimension [dimension]");
+
+    HANDLER_TESTER.invokeHandlers(
+        inputRowIterator,
+        granularitySpec,
+        NO_NEXT_INPUT_ROW
+    );
+  }
+
+  @Test
   public void doesNotInvokeHandlersIfRowValid()
   {
     DateTime timestamp = IndexTaskInputRowIteratorBuilderTestingFactory.TIMESTAMP;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org