You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2020/10/01 22:02:25 UTC

[druid] branch master updated: Fix the task id creation in CompactionTask (#10445)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new e282ab5  Fix the task id creation in CompactionTask (#10445)
e282ab5 is described below

commit e282ab5695bf18e3bd8b64e00a0333c1f713ff81
Author: Abhishek Agarwal <14...@users.noreply.github.com>
AuthorDate: Fri Oct 2 03:32:04 2020 +0530

    Fix the task id creation in CompactionTask (#10445)
    
    * Fix the task id creation in CompactionTask
    
    * review comments
    
    * Ignore test for range partitioning and segment lock
---
 .../druid/indexing/common/task/CompactionTask.java | 13 +++--
 .../parallel/ParallelIndexSupervisorTask.java      | 21 ++++---
 .../common/task/CompactionTaskParallelRunTest.java | 46 +++++++++++----
 .../parallel/ParallelIndexSupervisorTaskTest.java  | 67 ++++++++++++++++++++++
 4 files changed, 126 insertions(+), 21 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
index 62a9f26..ba2502f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
 import org.apache.curator.shaded.com.google.common.base.Verify;
 import org.apache.druid.client.coordinator.CoordinatorClient;
 import org.apache.druid.client.indexing.ClientCompactionTaskQuery;
+import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -361,10 +362,14 @@ public class CompactionTask extends AbstractBatchIndexTask
           // a new Appenderator on its own instead. As a result, they should use different sequence names to allocate
           // new segmentIds properly. See IndexerSQLMetadataStorageCoordinator.allocatePendingSegments() for details.
           // In this case, we use different fake IDs for each created index task.
-          final String subtaskId = tuningConfig == null || tuningConfig.getMaxNumConcurrentSubTasks() == 1
-                                   ? createIndexTaskSpecId(i)
-                                   : getId();
-          return newTask(subtaskId, ingestionSpecs.get(i));
+          ParallelIndexIngestionSpec ingestionSpec = ingestionSpecs.get(i);
+          InputSource inputSource = ingestionSpec.getIOConfig().getNonNullInputSource(
+              ingestionSpec.getDataSchema().getParser()
+          );
+          final String subtaskId = ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig)
+                                   ? getId()
+                                   : createIndexTaskSpecId(i);
+          return newTask(subtaskId, ingestionSpec);
         })
         .collect(Collectors.toList());
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
index dd0e759..4a218a0 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java
@@ -466,18 +466,25 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
     registerResourceCloserOnAbnormalExit(currentSubTaskHolder);
   }
 
-  private boolean isParallelMode()
+  public static boolean isParallelMode(InputSource inputSource, @Nullable ParallelIndexTuningConfig tuningConfig)
   {
+    if (null == tuningConfig) {
+      return false;
+    }
+    boolean useRangePartitions = useRangePartitions(tuningConfig);
     // Range partitioning is not implemented for runSequential() (but hash partitioning is)
-    int minRequiredNumConcurrentSubTasks = useRangePartitions() ? 1 : 2;
+    int minRequiredNumConcurrentSubTasks = useRangePartitions ? 1 : 2;
+    return inputSource.isSplittable() && tuningConfig.getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
+  }
 
-    return baseInputSource.isSplittable()
-           && ingestionSchema.getTuningConfig().getMaxNumConcurrentSubTasks() >= minRequiredNumConcurrentSubTasks;
+  private static boolean useRangePartitions(ParallelIndexTuningConfig tuningConfig)
+  {
+    return tuningConfig.getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
   }
 
-  private boolean useRangePartitions()
+  private boolean isParallelMode()
   {
-    return ingestionSchema.getTuningConfig().getGivenOrDefaultPartitionsSpec() instanceof SingleDimensionPartitionsSpec;
+    return isParallelMode(baseInputSource, ingestionSchema.getTuningConfig());
   }
 
   /**
@@ -512,7 +519,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
    */
   private TaskStatus runMultiPhaseParallel(TaskToolbox toolbox) throws Exception
   {
-    return useRangePartitions()
+    return useRangePartitions(ingestionSchema.getTuningConfig())
            ? runRangePartitionMultiPhaseParallel(toolbox)
            : runHashPartitionMultiPhaseParallel(toolbox);
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
index 616dd24..7b7005b 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskParallelRunTest.java
@@ -63,6 +63,7 @@ import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
 import org.joda.time.Interval;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -152,7 +153,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
           lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
           segment.getShardSpec().getClass()
       );
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
   }
@@ -161,9 +162,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
   public void testRunParallelWithHashPartitioningMatchCompactionState()
   {
     // Hash partitioning is not supported with segment lock yet
-    if (lockGranularity == LockGranularity.SEGMENT) {
-      return;
-    }
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
     runIndexTask(null, true);
 
     final Builder builder = new Builder(
@@ -182,7 +181,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
         compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
     );
     for (DataSegment segment : compactedSegments) {
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(HashBasedNumberedShardSpec.class, segment.getShardSpec().getClass());
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
@@ -192,9 +191,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
   public void testRunParallelWithRangePartitioning()
   {
     // Range partitioning is not supported with segment lock yet
-    if (lockGranularity == LockGranularity.SEGMENT) {
-      return;
-    }
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
     runIndexTask(null, true);
 
     final Builder builder = new Builder(
@@ -213,7 +210,36 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
         compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
     );
     for (DataSegment segment : compactedSegments) {
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
+      Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
+      Assert.assertEquals(expectedState, segment.getLastCompactionState());
+    }
+  }
+
+  @Test
+  public void testRunParallelWithRangePartitioningWithSingleTask()
+  {
+    // Range partitioning is not supported with segment lock yet
+    Assume.assumeFalse(lockGranularity == LockGranularity.SEGMENT);
+    runIndexTask(null, true);
+
+    final Builder builder = new Builder(
+        DATA_SOURCE,
+        getSegmentLoaderFactory(),
+        RETRY_POLICY_FACTORY
+    );
+    final CompactionTask compactionTask = builder
+        .inputSpec(new CompactionIntervalSpec(INTERVAL_TO_INDEX, null))
+        .tuningConfig(newTuningConfig(new SingleDimensionPartitionsSpec(7, null, "dim", false), 1, true))
+        .build();
+
+    final Set<DataSegment> compactedSegments = runTask(compactionTask);
+    final CompactionState expectedState = new CompactionState(
+        new SingleDimensionPartitionsSpec(7, null, "dim", false),
+        compactionTask.getTuningConfig().getIndexSpec().asMap(getObjectMapper())
+    );
+    for (DataSegment segment : compactedSegments) {
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertSame(SingleDimensionShardSpec.class, segment.getShardSpec().getClass());
       Assert.assertEquals(expectedState, segment.getLastCompactionState());
     }
@@ -242,7 +268,7 @@ public class CompactionTaskParallelRunTest extends AbstractParallelIndexSupervis
           lockGranularity == LockGranularity.TIME_CHUNK ? NumberedShardSpec.class : NumberedOverwriteShardSpec.class,
           segment.getShardSpec().getClass()
       );
-      // Expecte compaction state to exist as store compaction state by default
+      // Expect compaction state to exist as store compaction state by default
       Assert.assertEquals(null, segment.getLastCompactionState());
     }
   }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index 60c286a..ef1db8a 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -21,11 +21,14 @@ package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Ordering;
+import org.apache.druid.data.input.InputSource;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.JsonInputFormat;
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
+import org.apache.druid.indexer.partitions.PartitionsSpec;
+import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.segment.IndexSpec;
@@ -36,6 +39,7 @@ import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
 import org.apache.druid.timeline.partition.HashPartitionFunction;
+import org.easymock.EasyMock;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
@@ -55,6 +59,9 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.mock;
+
 @RunWith(Enclosed.class)
 public class ParallelIndexSupervisorTaskTest
 {
@@ -241,4 +248,64 @@ public class ParallelIndexSupervisorTaskTest
       );
     }
   }
+
+  public static class staticUtilsTest
+  {
+    @Test
+    public void testIsParallelModeFalse_nullTuningConfig()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, null));
+    }
+
+    @Test
+    public void testIsParallelModeFalse_rangePartition()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      expect(inputSource.isSplittable()).andReturn(true).anyTimes();
+
+      ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+      expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
+                                                            .anyTimes();
+      expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(0).andReturn(1).andReturn(2);
+      EasyMock.replay(inputSource, tuningConfig);
+
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+    }
+
+    @Test
+    public void testIsParallelModeFalse_notRangePartition()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      expect(inputSource.isSplittable()).andReturn(true).anyTimes();
+
+      ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+      expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(PartitionsSpec.class))
+                                                            .anyTimes();
+      expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(1).andReturn(2).andReturn(3);
+      EasyMock.replay(inputSource, tuningConfig);
+
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+      Assert.assertTrue(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+    }
+
+    @Test
+    public void testIsParallelModeFalse_inputSourceNotSplittable()
+    {
+      InputSource inputSource = mock(InputSource.class);
+      expect(inputSource.isSplittable()).andReturn(false).anyTimes();
+
+      ParallelIndexTuningConfig tuningConfig = mock(ParallelIndexTuningConfig.class);
+      expect(tuningConfig.getGivenOrDefaultPartitionsSpec()).andReturn(mock(SingleDimensionPartitionsSpec.class))
+                                                            .anyTimes();
+      expect(tuningConfig.getMaxNumConcurrentSubTasks()).andReturn(3);
+      EasyMock.replay(inputSource, tuningConfig);
+
+      Assert.assertFalse(ParallelIndexSupervisorTask.isParallelMode(inputSource, tuningConfig));
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org