You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2020/02/10 23:18:03 UTC
[druid] branch master updated: More superbatch range partitioning
tests (#9266)
This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e8146d5 More superbatch range partitioning tests (#9266)
e8146d5 is described below
commit e8146d5914d832a14ba5e63cce6d5ffc31b18520
Author: Chi Cao Minh <ch...@imply.io>
AuthorDate: Mon Feb 10 15:17:54 2020 -0800
More superbatch range partitioning tests (#9266)
More functional tests to cover handling of input data that has a
partition dimension that contains:
1) Null values: Should be in first partition
2) Multi values: Should cause superbatch task to abort
---
.../common/config/NullValueHandlingConfig.java | 2 +-
indexing-service/pom.xml | 5 +
.../AbstractMultiPhaseParallelIndexingTest.java | 5 +-
.../AbstractParallelIndexSupervisorTaskTest.java | 4 +
...ashPartitionMultiPhaseParallelIndexingTest.java | 4 +-
...ngePartitionMultiPhaseParallelIndexingTest.java | 149 +++++++++++++--------
6 files changed, 109 insertions(+), 60 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java b/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java
index ae15d95..fa05947 100644
--- a/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java
+++ b/core/src/main/java/org/apache/druid/common/config/NullValueHandlingConfig.java
@@ -24,7 +24,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
public class NullValueHandlingConfig
{
- private static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull";
+ public static final String NULL_HANDLING_CONFIG_STRING = "druid.generic.useDefaultValueForNull";
@JsonProperty("useDefaultValueForNull")
private final boolean useDefaultValuesForNull;
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 1939778..82564cb 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -277,6 +277,11 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.stefanbirkner</groupId>
+ <artifactId>system-rules</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index a7c4396..aaa31ad 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -108,7 +108,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
File inputDir,
String filter,
DimensionBasedPartitionsSpec partitionsSpec,
- int maxNumConcurrentSubTasks
+ int maxNumConcurrentSubTasks,
+ TaskState expectedTaskStatus
) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
@@ -129,7 +130,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest extends AbstractParallelIn
TaskStatus taskStatus = task.run(toolbox);
- Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
+ Assert.assertEquals(expectedTaskStatus, taskStatus.getStatusCode());
shutdownTask(task);
return actionClient.getPublishedSegments();
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
index 029d2bf..9731272 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
@@ -170,6 +170,10 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
catch (EntryExistsException e) {
throw new RuntimeException(e);
}
+
+ // WARNING: In production, subtasks are created via HTTP calls and instantiated by Jackson, which means they
+ // cannot share objects like they can here. For example, if the indexing task uses JsonParseSpec, the same
+ // JSONFlattenerMaker instance is shared among subtasks, which is bad since JSONFlattenerMaker is not thread-safe.
tasks.put(subTask.getId(), service.submit(() -> {
try {
final TaskToolbox toolbox = createTaskToolbox(subTask);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index 7219f16..4670d01 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -131,7 +132,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest extends AbstractMultiPh
inputDir,
"test_*",
new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
- MAX_NUM_CONCURRENT_SUB_TASKS
+ MAX_NUM_CONCURRENT_SUB_TASKS,
+ TaskState.SUCCESS
);
assertHashedPartition(publishedSegments);
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index fcdd5bb..a4b8332 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -25,11 +25,13 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.SetMultimap;
import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.common.config.NullValueHandlingConfig;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskToolbox;
@@ -38,7 +40,6 @@ import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.timeline.DataSegment;
@@ -48,7 +49,9 @@ import org.hamcrest.Matchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.contrib.java.lang.system.ProvideSystemProperty;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -67,63 +70,76 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiPhaseParallelIndexingTest
{
+ private static final boolean USE_INPUT_FORMAT_API = true;
+ private static final boolean USE_MULTIVALUE_DIM = true;
private static final int NUM_FILE = 10;
private static final int NUM_ROW = 20;
- private static final int NUM_DAY = 2;
+ private static final int DIM_FILE_CARDINALITY = 2;
private static final int NUM_PARTITION = 2;
private static final int YEAR = 2017;
+ private static final String TIME = "ts";
private static final String DIM1 = "dim1";
private static final String DIM2 = "dim2";
+ private static final String LIST_DELIMITER = "|";
private static final List<String> DIMS = ImmutableList.of(DIM1, DIM2);
private static final String TEST_FILE_NAME_PREFIX = "test_";
private static final ParseSpec PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
- "ts",
+ TIME,
"auto",
null
),
new DimensionsSpec(
- DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", DIM1, DIM2)),
- new ArrayList<>(),
- new ArrayList<>()
+ DimensionsSpec.getDefaultSchemas(Arrays.asList(TIME, DIM1, DIM2)),
+ Collections.emptyList(),
+ Collections.emptyList()
),
- null,
- Arrays.asList("ts", DIM1, DIM2, "val"),
+ LIST_DELIMITER,
+ Arrays.asList(TIME, DIM1, DIM2, "val"),
false,
0
);
- @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}")
+ @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, maxNumConcurrentSubTasks={2}, useMultiValueDim={3}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
- new Object[]{LockGranularity.TIME_CHUNK, false, 2},
- new Object[]{LockGranularity.TIME_CHUNK, true, 2},
- new Object[]{LockGranularity.SEGMENT, true, 2},
- new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns subtask instead of running in supervisor
+ new Object[]{LockGranularity.TIME_CHUNK, !USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
+ new Object[]{LockGranularity.TIME_CHUNK, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
+ new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, !USE_MULTIVALUE_DIM},
+ new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 1, !USE_MULTIVALUE_DIM}, // will spawn subtask
+ new Object[]{LockGranularity.SEGMENT, USE_INPUT_FORMAT_API, 2, USE_MULTIVALUE_DIM} // expected to fail
);
}
+ // Interpret empty values in CSV as null
+ @Rule
+ public final ProvideSystemProperty noDefaultNullValue = new ProvideSystemProperty(
+ NullValueHandlingConfig.NULL_HANDLING_CONFIG_STRING,
+ "false"
+ );
+
private File inputDir;
- private SetMultimap<Interval, String> intervalToDim1;
+ private SetMultimap<Interval, List<Object>> intervalToDims;
private final int maxNumConcurrentSubTasks;
+ private final boolean useMultivalueDim;
public RangePartitionMultiPhaseParallelIndexingTest(
LockGranularity lockGranularity,
boolean useInputFormatApi,
- int maxNumConcurrentSubTasks
+ int maxNumConcurrentSubTasks,
+ boolean useMultivalueDim
)
{
super(lockGranularity, useInputFormatApi);
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
+ this.useMultivalueDim = useMultivalueDim;
}
@Override
@@ -132,41 +148,65 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
{
super.setup();
inputDir = temporaryFolder.newFolder("data");
- intervalToDim1 = createInputFiles(inputDir);
+ intervalToDims = createInputFiles(inputDir, useMultivalueDim);
}
- private static SetMultimap<Interval, String> createInputFiles(File inputDir) throws IOException
+ private static SetMultimap<Interval, List<Object>> createInputFiles(File inputDir, boolean useMultivalueDim)
+ throws IOException
{
- SetMultimap<Interval, String> intervalToDim1 = HashMultimap.create();
+ SetMultimap<Interval, List<Object>> intervalToDims = HashMultimap.create();
for (int fileIndex = 0; fileIndex < NUM_FILE; fileIndex++) {
Path path = new File(inputDir, TEST_FILE_NAME_PREFIX + fileIndex).toPath();
try (final Writer writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)) {
- for (int i = 0; i < (NUM_ROW / NUM_DAY); i++) {
- for (int d = 0; d < NUM_DAY; d++) {
- writeRow(writer, i + d, fileIndex + d, intervalToDim1);
+ for (int i = 0; i < (NUM_ROW / DIM_FILE_CARDINALITY); i++) {
+ for (int d = 0; d < DIM_FILE_CARDINALITY; d++) {
+ int rowIndex = i * DIM_FILE_CARDINALITY + d;
+ String dim1Value = createDim1Value(rowIndex, fileIndex, useMultivalueDim);
+ writeRow(writer, i + d, dim1Value, fileIndex, intervalToDims);
}
}
}
}
- return intervalToDim1;
+ return intervalToDims;
}
- private static void writeRow(Writer writer, int day, int fileIndex, Multimap<Interval, String> intervalToDim1)
- throws IOException
+ @Nullable
+ private static String createDim1Value(int rowIndex, int fileIndex, boolean useMultivalueDim)
+ {
+ if (rowIndex == fileIndex) {
+ return null;
+ }
+
+ String dim1Value = String.valueOf(fileIndex);
+ return useMultivalueDim ? dim1Value + LIST_DELIMITER + dim1Value : dim1Value;
+ }
+
+ private static void writeRow(
+ Writer writer,
+ int day,
+ @Nullable String dim1Value,
+ int fileIndex,
+ Multimap<Interval, List<Object>> intervalToDims
+ ) throws IOException
{
Interval interval = Intervals.of("%s-12-%d/%s-12-%d", YEAR, day + 1, YEAR, day + 2);
String startDate = interval.getStart().toString("y-M-d");
- String dim1Value = String.valueOf(fileIndex + 10);
- writer.write(StringUtils.format("%s,%s,%d th test file\n", startDate, dim1Value, fileIndex));
- intervalToDim1.put(interval, dim1Value);
+ String dim2Value = "test file " + fileIndex;
+ String row = startDate + ",";
+ if (dim1Value != null) {
+ row += dim1Value;
+ }
+ row += "," + dim2Value + "\n";
+ writer.write(row);
+ intervalToDims.put(interval, Arrays.asList(dim1Value, dim2Value));
}
@Test
public void createsCorrectRangePartitions() throws Exception
{
- int targetRowsPerSegment = NUM_ROW / NUM_DAY / NUM_PARTITION;
+ int targetRowsPerSegment = NUM_ROW / DIM_FILE_CARDINALITY / NUM_PARTITION;
final Set<DataSegment> publishedSegments = runTestTask(
PARSE_SPEC,
Intervals.of("%s/%s", YEAR, YEAR + 1),
@@ -178,9 +218,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
DIM1,
false
),
- maxNumConcurrentSubTasks
+ maxNumConcurrentSubTasks,
+ useMultivalueDim ? TaskState.FAILED : TaskState.SUCCESS
);
- assertRangePartitions(publishedSegments);
+
+ if (!useMultivalueDim) {
+ assertRangePartitions(publishedSegments);
+ }
}
private void assertRangePartitions(Set<DataSegment> publishedSegments) throws IOException
@@ -188,16 +232,13 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
Multimap<Interval, DataSegment> intervalToSegments = ArrayListMultimap.create();
publishedSegments.forEach(s -> intervalToSegments.put(s.getInterval(), s));
- SortedSet<Interval> publishedIntervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
- publishedIntervals.addAll(intervalToSegments.keySet());
+ Set<Interval> publishedIntervals = intervalToSegments.keySet();
assertHasExpectedIntervals(publishedIntervals);
- Interval firstInterval = publishedIntervals.first();
- Interval lastInterval = publishedIntervals.last();
File tempSegmentDir = temporaryFolder.newFolder();
intervalToSegments.asMap().forEach((interval, segments) -> {
- assertNumPartition(interval, segments, firstInterval, lastInterval);
+ assertNumPartition(segments);
List<String> allValues = new ArrayList<>(NUM_ROW);
for (DataSegment segment : segments) {
@@ -212,22 +253,12 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
private void assertHasExpectedIntervals(Set<Interval> publishedSegmentIntervals)
{
- Assert.assertEquals(intervalToDim1.keySet(), publishedSegmentIntervals);
+ Assert.assertEquals(intervalToDims.keySet(), publishedSegmentIntervals);
}
- private static void assertNumPartition(
- Interval interval,
- Collection<DataSegment> segments,
- Interval firstInterval,
- Interval lastInterval
- )
+ private static void assertNumPartition(Collection<DataSegment> segments)
{
- int expectedNumPartition = NUM_PARTITION;
- if (interval.equals(firstInterval) || interval.equals(lastInterval)) {
- expectedNumPartition -= 1;
- }
- expectedNumPartition *= NUM_DAY;
- Assert.assertEquals(expectedNumPartition, segments.size());
+ Assert.assertEquals(NUM_PARTITION, segments.size());
}
private List<String> getColumnValues(DataSegment segment, File tempDir)
@@ -253,18 +284,24 @@ public class RangePartitionMultiPhaseParallelIndexingTest extends AbstractMultiP
}
if (end != null) {
- Assert.assertThat(value.compareTo(end), Matchers.lessThan(0));
+ if (value == null) {
+ Assert.assertNull("null values should be in first partition", start);
+ } else {
+ Assert.assertThat(value.compareTo(end), Matchers.lessThan(0));
+ }
}
}
}
private void assertIntervalHasAllExpectedValues(Interval interval, List<String> actualValues)
{
- List<String> expectedValues = new ArrayList<>(intervalToDim1.get(interval));
- Assert.assertEquals(expectedValues.size(), actualValues.size());
- Collections.sort(expectedValues);
- Collections.sort(actualValues);
- Assert.assertEquals(expectedValues, actualValues);
+ List<String> expectedValues = intervalToDims.get(interval)
+ .stream()
+ .map(d -> (String) d.get(0))
+ .sorted(Comparators.naturalNullsFirst())
+ .collect(Collectors.toList());
+ actualValues.sort(Comparators.naturalNullsFirst());
+ Assert.assertEquals(interval.toString(), expectedValues, actualValues);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org