You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/02/10 23:18:03 UTC

[druid] branch master updated: More superbatch range partitioning tests (#9266)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e8146d5  More superbatch range partitioning tests (#9266)
e8146d5 is described below

commit e8146d5914d832a14ba5e63cce6d5ffc31b18520
Author: Chi Cao Minh <ch...@imply.io>
AuthorDate: Mon Feb 10 15:17:54 2020 -0800

    More superbatch range partitioning tests (#9266)
    
    More functional tests to cover handling of input data that has a
    partition dimension that contains:
    
    1) Null values: Should be in first partition
    
    2) Multi values: Should cause superbatch task to abort
---
 .../common/config/NullValueHandlingConfig.java     |   2 +-
 indexing-service/pom.xml                           |   5 +
 .../AbstractMultiPhaseParallelIndexingTest.java    |   5 +-
 .../AbstractParallelIndexSupervisorTaskTest.java   |   4 +
 ...ashPartitionMultiPhaseParallelIndexingTest.java |   4 +-
 ...ngePartitionMultiPhaseParallelIndexingTest.java | 149 +++++++++++++--------
 6 files changed, 109 insertions(+), 60 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java b/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java
index ae15d95..fa05947 100644
--- a/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java
+++ b/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 
 public class NullValueHandlingConfig
 {
-  private static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull";
+  public static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull";
 
   @JsonProperty("useDefaultValueForNull")
   private final boolean useDefaultValuesForNull;
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 1939778..82564cb 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -277,6 +277,11 @@
             <artifactId>equalsverifier</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.github.stefanbirkner</groupId>
+            <artifactId>system-rules</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a7c4396..aaa31ad 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -108,7 +108,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
       File inputDir,
       String filter,
       DimensionBasedPartitionsSpec partitionsSpec,
-      int maxNumConcurrentSubTasks
+      int maxNumConcurrentSubTasks,
+      TaskState expectedTaskStatus
   ) throws Exception
   {
     final ParallelIndexSupervisorTask task = newTask(
@@ -129,7 +130,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
 
     TaskStatus taskStatus = task.run(toolbox);
 
-    Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
+    Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
     shutdownTask(task);
     return actionClient.getPublishedSegments();
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 029d2bf..9731272 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -170,6 +170,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
       catch (EntryExistsException e) {
         throw new RuntimeException(e);
       }
+
+      // WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
+      // cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
+      // JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
       tasks.put(subTask.getId(), service.submit(() -> {
         try {
           final TaskToolbox toolbox = createTaskToolbox(subTask);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index 7219f16..4670d01 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -131,7 +132,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
         inputDir,
         "test_*",
         new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
-        MAX_NUM_CONCURRENT_SUB_TASKS
+        MAX_NUM_CONCURRENT_SUB_TASKS,
+        TaskState.SUCCESS
     );
     assertHashedPartition(publishedSegments);
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index fcdd5bb..a4b8332 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.SetMultimap;
 import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.common.config.NullValueHandlingConfig;
 import org.apache.druid.data.input.InputSplit;
 import org.apache.druid.data.input.impl.CSVParseSpec;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.ParseSpec;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.indexing.common.LockGranularity;
 import org.apache.druid.indexing.common.TaskToolbox;
@@ -38,7 +40,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.scan.ScanResultValue;
 import org.apache.druid.timeline.DataSegment;
@@ -48,7 +49,9 @@ import org.hamcrest.Matchers;
 import org.joda.time.Interval;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -67,63 +70,76 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 @RunWith(Parameterized.class)
 public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
 {
+  private static final boolean USE_INPUT_FORMAT_API = true;
+  private static final boolean USE_MULTIVALUE_DIM = true;
   private static final int NUM_FILE = 10;
   private static final int NUM_ROW = 20;
-  private static final int NUM_DAY = 2;
+  private static final int DIM_FILE_CARDINALITY = 2;
   private static final int NUM_PARTITION = 2;
   private static final int YEAR = 2017;
+  private static final String TIME = "ts";
   private static final String DIM1 = "dim1";
   private static final String DIM2 = "dim2";
+  private static final String LIST_DELIMITER = "|";
   private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2);
   private static final String TEST_FILE_NAME_PREFIX = "test_";
   private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
       new TimestampSpec(
-          "ts",
+          TIME,
           "auto",
           null
       ),
       new DimensionsSpec(
-          DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", DIM1, DIM2)),
-          new ArrayList<>(),
-          new ArrayList<>()
+          DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2)),
+          Collections.emptyList(),
+          Collections.emptyList()
       ),
-      null,
-      Arrays.asList("ts", DIM1, DIM2, "val"),
+      LIST_DELIMITER,
+      Arrays.asList(TIME, DIM1, DIM2, "val"),
       false,
       0
   );
 
-  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
   public static Iterable<Object[]> constructorFeeder()
   {
     return ImmutableList.of(
-        new Object[]{LockGranularity.TIME_CHUNK, false, 2},
-        new Object[]{LockGranularity.TIME_CHUNK, true, 2},
-        new Object[]{LockGranularity.SEGMENT, true, 2},
-        new Object[]{LockGranularity.SEGMENT, true, 1}  // currently spawns subtask instead of running in supervisor
+        new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
+        new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
+        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
+        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM},  // will spawn subtask
+        new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM}  // expected to fail
     );
   }
 
+  // Interpret empty values in CSV as null
+  @Rule
+  public final ProvideSystemProperty noDefaultNullValue = new ProvideSystemProperty(
+      NullValueHandlingConfig.NULL_HANDLING_CONFIG_STRING,
+      "false"
+  );
+
   private File inputDir;
-  private SetMultimap<Interval, String> intervalToDim1;
+  private SetMultimap<Interval, List<Object>> intervalToDims;
 
   private final int maxNumConcurrentSubTasks;
+  private final boolean useMultivalueDim;
 
   public RangePartitionMultiPhaseParallelIndexingTest(
       LockGranularity lockGranularity,
       boolean useInputFormatApi,
-      int maxNumConcurrentSubTasks
+      int maxNumConcurrentSubTasks,
+      boolean useMultivalueDim
   )
   {
     super(lockGranularity, useInputFormatApi);
     this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
+    this.useMultivalueDim = useMultivalueDim;
   }
 
   @Override
@@ -132,41 +148,65 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
   {
     super.setup();
     inputDir = temporaryFolder.newFolder("data");
-    intervalToDim1 = createInputFiles(inputDir);
+    intervalToDims = createInputFiles(inputDir, useMultivalueDim);
   }
 
-  private static SetMultimap<Interval, String> createInputFiles(File inputDir) throws IOException
+  private static SetMultimap<Interval, List<Object>> createInputFiles(File inputDir, boolean useMultivalueDim)
+      throws IOException
   {
-    SetMultimap<Interval, String> intervalToDim1 = HashMultimap.create();
+    SetMultimap<Interval, List<Object>> intervalToDims = HashMultimap.create();
 
     for (int fileIndex = 0; fileIndex < NUM_FILE; fileIndex++) {
       Path path = new File(inputDir, TEST_FILE_NAME_PREFIX + fileIndex).toPath();
       try (final Writer writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
-        for (int i = 0; i < (NUM_ROW / NUM_DAY); i++) {
-          for (int d = 0; d < NUM_DAY; d++) {
-            writeRow(writer, i + d, fileIndex + d, intervalToDim1);
+        for (int i = 0; i < (NUM_ROW / DIM_FILE_CARDINALITY); i++) {
+          for (int d = 0; d < DIM_FILE_CARDINALITY; d++) {
+            int rowIndex = i * DIM_FILE_CARDINALITY + d;
+            String dim1Value = createDim1Value(rowIndex, fileIndex, useMultivalueDim);
+            writeRow(writer, i + d, dim1Value, fileIndex, intervalToDims);
           }
         }
       }
     }
 
-    return intervalToDim1;
+    return intervalToDims;
   }
 
-  private static void writeRow(Writer writer, int day, int fileIndex, Multimap<Interval, String> intervalToDim1)
-      throws IOException
+  @Nullable
+  private static String createDim1Value(int rowIndex, int fileIndex, boolean useMultivalueDim)
+  {
+    if (rowIndex == fileIndex) {
+      return null;
+    }
+
+    String dim1Value = String.valueOf(fileIndex);
+    return useMultivalueDim ? dim1Value + LIST_DELIMITER + dim1Value : dim1Value;
+  }
+
+  private static void writeRow(
+      Writer writer,
+      int day,
+      @Nullable String dim1Value,
+      int fileIndex,
+      Multimap<Interval, List<Object>> intervalToDims
+  ) throws IOException
   {
     Interval interval = Intervals.of("%s-12-%d/%s-12-%d", YEAR, day + 1, YEAR, day + 2);
     String startDate = interval.getStart().toString("y-M-d");
-    String dim1Value = String.valueOf(fileIndex + 10);
-    writer.write(StringUtils.format("%s,%s,%d th test file\n", startDate, dim1Value, fileIndex));
-    intervalToDim1.put(interval, dim1Value);
+    String dim2Value = "test file " + fileIndex;
+    String row = startDate + ",";
+    if (dim1Value != null) {
+      row += dim1Value;
+    }
+    row += "," + dim2Value + "\n";
+    writer.write(row);
+    intervalToDims.put(interval, Arrays.asList(dim1Value, dim2Value));
   }
 
   @Test
   public void createsCorrectRangePartitions() throws Exception
   {
-    int targetRowsPerSegment = NUM_ROW / NUM_DAY / NUM_PARTITION;
+    int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
     final Set<DataSegment> publishedSegments = runTestTask(
         PARSE_SPEC,
         Intervals.of("%s/%s", YEAR, YEAR + 1),
@@ -178,9 +218,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
             DIM1,
             false
         ),
-        maxNumConcurrentSubTasks
+        maxNumConcurrentSubTasks,
+        useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
     );
-    assertRangePartitions(publishedSegments);
+
+    if (!useMultivalueDim) {
+      assertRangePartitions(publishedSegments);
+    }
   }
 
   private void assertRangePartitions(Set<DataSegment> publishedSegments) throws IOException
@@ -188,16 +232,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
     Multimap<Interval, DataSegment> intervalToSegments = ArrayListMultimap.create();
     publishedSegments.forEach(s -> intervalToSegments.put(s.getInterval(), s));
 
-    SortedSet<Interval> publishedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
-    publishedIntervals.addAll(intervalToSegments.keySet());
+    Set<Interval> publishedIntervals = intervalToSegments.keySet();
     assertHasExpectedIntervals(publishedIntervals);
 
-    Interval firstInterval = publishedIntervals.first();
-    Interval lastInterval = publishedIntervals.last();
     File tempSegmentDir = temporaryFolder.newFolder();
 
     intervalToSegments.asMap().forEach((interval, segments) -> {
-      assertNumPartition(interval, segments, firstInterval, lastInterval);
+      assertNumPartition(segments);
 
       List<String> allValues = new ArrayList<>(NUM_ROW);
       for (DataSegment segment : segments) {
@@ -212,22 +253,12 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
 
   private void assertHasExpectedIntervals(Set<Interval> publishedSegmentIntervals)
   {
-    Assert.assertEquals(intervalToDim1.keySet(), publishedSegmentIntervals);
+    Assert.assertEquals(intervalToDims.keySet(), publishedSegmentIntervals);
   }
 
-  private static void assertNumPartition(
-      Interval interval,
-      Collection<DataSegment> segments,
-      Interval firstInterval,
-      Interval lastInterval
-  )
+  private static void assertNumPartition(Collection<DataSegment> segments)
   {
-    int expectedNumPartition = NUM_PARTITION;
-    if (interval.equals(firstInterval) || interval.equals(lastInterval)) {
-      expectedNumPartition -= 1;
-    }
-    expectedNumPartition *= NUM_DAY;
-    Assert.assertEquals(expectedNumPartition, segments.size());
+    Assert.assertEquals(NUM_PARTITION, segments.size());
   }
 
   private List<String> getColumnValues(DataSegment segment, File tempDir)
@@ -253,18 +284,24 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
       }
 
       if (end != null) {
-        Assert.assertThat(value.compareTo(end), Matchers.lessThan(0));
+        if (value == null) {
+          Assert.assertNull("null values should be in first partition", start);
+        } else {
+          Assert.assertThat(value.compareTo(end), Matchers.lessThan(0));
+        }
       }
     }
   }
 
   private void assertIntervalHasAllExpectedValues(Interval interval, List<String> actualValues)
   {
-    List<String> expectedValues = new ArrayList<>(intervalToDim1.get(interval));
-    Assert.assertEquals(expectedValues.size(), actualValues.size());
-    Collections.sort(expectedValues);
-    Collections.sort(actualValues);
-    Assert.assertEquals(expectedValues, actualValues);
+    List<String> expectedValues = intervalToDims.get(interval)
+                                                .stream()
+                                                .map(d -> (String) d.get(0))
+                                                .sorted(Comparators.naturalNullsFirst())
+                                                .collect(Collectors.toList());
+    actualValues.sort(Comparators.naturalNullsFirst());
+    Assert.assertEquals(interval.toString(), expectedValues, actualValues);
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org