You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2023/02/23 06:08:09 UTC
[druid] branch master updated: Hadoop based batch ingestion support range partition (#13303)
This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 79f04e71a1 Hadoop based batch ingestion support range partition (#13303)
79f04e71a1 is described below
commit 79f04e71a1ef1a181dcf7f5a3d69ff76858446b9
Author: hqx871 <hq...@gmail.com>
AuthorDate: Thu Feb 23 14:08:03 2023 +0800
Hadoop based batch ingestion support range partition (#13303)
This pr implements range partitioning for hadoop-based ingestion. For detail about multi dimension range partition can be seen #11848.
---
.../druid/indexer/DeterminePartitionsJob.java | 228 +++++++----
.../HadoopDruidDetermineConfigurationJob.java | 4 +-
.../indexer/DetermineRangePartitionsJobTest.java | 450 +++++++++++++++++++++
.../druid/tests/hadoop/ITHadoopIndexTest.java | 3 +
.../partition/DimensionRangeShardSpecTest.java | 55 +++
5 files changed, 666 insertions(+), 74 deletions(-)
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index ebe69cd15a..11839b6931 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -19,10 +19,12 @@
package org.apache.druid.indexer;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -33,12 +35,17 @@ import com.google.common.io.Closeables;
import org.apache.druid.collections.CombiningIterable;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.Rows;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.hadoop.conf.Configurable;
@@ -71,6 +78,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -95,7 +104,7 @@ public class DeterminePartitionsJob implements Jobby
private static final Logger log = new Logger(DeterminePartitionsJob.class);
private static final Joiner TAB_JOINER = HadoopDruidIndexerConfig.TAB_JOINER;
- private static final Splitter TAB_SPLITTER = HadoopDruidIndexerConfig.TAB_SPLITTER;
+ private static final Joiner COMMA_JOINER = Joiner.on(",").useForNull("");
private final HadoopDruidIndexerConfig config;
@@ -119,15 +128,15 @@ public class DeterminePartitionsJob implements Jobby
* in the final segment.
*/
- if (!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)) {
+ if (!(config.getPartitionsSpec() instanceof DimensionRangePartitionsSpec)) {
throw new ISE(
- "DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]",
+ "DeterminePartitionsJob can only be run for DimensionRangePartitionsSpec, partitionSpec found [%s]",
config.getPartitionsSpec()
);
}
- final SingleDimensionPartitionsSpec partitionsSpec =
- (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
+ final DimensionRangePartitionsSpec partitionsSpec =
+ (DimensionRangePartitionsSpec) config.getPartitionsSpec();
if (!partitionsSpec.isAssumeGrouped()) {
groupByJob = Job.getInstance(
@@ -382,8 +391,7 @@ public class DeterminePartitionsJob implements Jobby
protected void setup(Context context)
{
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
- SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
- helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+ helper = new DeterminePartitionsDimSelectionMapperHelper(config);
}
@Override
@@ -412,8 +420,7 @@ public class DeterminePartitionsJob implements Jobby
{
super.setup(context);
final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
- final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
- helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+ helper = new DeterminePartitionsDimSelectionMapperHelper(config);
}
@Override
@@ -437,13 +444,25 @@ public class DeterminePartitionsJob implements Jobby
static class DeterminePartitionsDimSelectionMapperHelper
{
private final HadoopDruidIndexerConfig config;
- private final String partitionDimension;
+ private final List<List<String>> dimensionGroupingSet;
private final Map<Long, Integer> intervalIndexes;
- DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
+ DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config)
{
this.config = config;
- this.partitionDimension = partitionDimension;
+
+ DimensionRangePartitionsSpec spec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
+ final DimensionsSpec dimensionsSpec = config.getSchema().getDataSchema().getDimensionsSpec();
+ this.dimensionGroupingSet = new ArrayList<>();
+ final List<String> partitionDimensions = spec.getPartitionDimensions();
+ //if the partitionDimensions is not set, we just try every dimension to find the best one.
+ if (partitionDimensions.isEmpty()) {
+ for (DimensionSchema dimensionSchema : dimensionsSpec.getDimensions()) {
+ dimensionGroupingSet.add(Collections.singletonList(dimensionSchema.getName()));
+ }
+ } else {
+ dimensionGroupingSet.add(partitionDimensions);
+ }
final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0;
@@ -476,22 +495,29 @@ public class DeterminePartitionsJob implements Jobby
final byte[] groupKey = buf.array();
// Emit row-counter value.
- write(context, groupKey, new DimValueCount("", "", 1));
-
- for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
- final String dim = dimAndValues.getKey();
-
- if (partitionDimension == null || partitionDimension.equals(dim)) {
- final Iterable<String> dimValues = dimAndValues.getValue();
-
- if (Iterables.size(dimValues) == 1) {
- // Emit this value.
- write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
+ write(context, groupKey, new DimValueCount(Collections.emptyList(), StringTuple.create(), 1));
+
+ Iterator<List<String>> dimensionGroupIterator = dimensionGroupingSet.iterator();
+ while (dimensionGroupIterator.hasNext()) {
+ List<String> dimensionGroup = dimensionGroupIterator.next();
+ String[] values = new String[dimensionGroup.size()];
+ int numRow = 1;
+ for (int i = 0; i < dimensionGroup.size(); i++) {
+ String dimension = dimensionGroup.get(i);
+ final Iterable<String> dimValues = dims.get(dimension);
+ if (dimValues != null && Iterables.size(dimValues) == 1) {
+ values[i] = Iterables.getOnlyElement(dimValues);
+ } else if (dimValues == null || Iterables.isEmpty(dimValues)) {
+ //just let values[i] be null when the dim value is null.
} else {
// This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
- write(context, groupKey, new DimValueCount(dim, "", -1));
+ numRow = -1;
}
}
+ write(context, groupKey, new DimValueCount(dimensionGroup, StringTuple.create(values), numRow));
+ if (numRow == -1) {
+ dimensionGroupIterator.remove();
+ }
}
}
}
@@ -572,12 +598,30 @@ public class DeterminePartitionsJob implements Jobby
private static Iterable<DimValueCount> combineRows(Iterable<Text> input)
{
+ final Comparator<List<String>> dimsComparator = new Comparator<List<String>>()
+ {
+ @Override
+ public int compare(List<String> o1, List<String> o2)
+ {
+ int len = Math.min(o1.size(), o2.size());
+ for (int i = 0; i < len; i++) {
+ int comparison = o1.get(i).compareTo(o2.get(i));
+ if (comparison != 0) {
+ return comparison;
+ }
+ }
+ return Integer.compare(o1.size(), o2.size());
+ }
+ };
return new CombiningIterable<>(
Iterables.transform(
input,
DimValueCount::fromText
),
- (o1, o2) -> ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result(),
+ (o1, o2) -> ComparisonChain.start()
+ .compare(o1.dims, o2.dims, dimsComparator)
+ .compare(o1.values, o2.values)
+ .result(),
(arg1, arg2) -> {
if (arg2 == null) {
return arg1;
@@ -585,7 +629,7 @@ public class DeterminePartitionsJob implements Jobby
// Respect "poisoning" (negative values mean we can't use this dimension)
final long newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
- return new DimValueCount(arg1.dim, arg1.value, newNumRows);
+ return new DimValueCount(arg1.dims, arg1.values, newNumRows);
}
);
}
@@ -603,6 +647,28 @@ public class DeterminePartitionsJob implements Jobby
}
}
+ static DimensionRangeShardSpec createShardSpec(
+ boolean isSingleDim,
+ List<String> dimensions,
+ @Nullable StringTuple start,
+ @Nullable StringTuple end,
+ int partitionNum,
+ @Nullable Integer numCorePartitions
+ )
+ {
+ if (isSingleDim) {
+ return new SingleDimensionShardSpec(
+ Iterables.getOnlyElement(dimensions),
+ start == null ? null : start.get(0),
+ end == null ? null : end.get(0),
+ partitionNum,
+ numCorePartitions
+ );
+ } else {
+ return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, numCorePartitions);
+ }
+ }
+
public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
{
private static final double SHARD_COMBINE_THRESHOLD = 0.25;
@@ -626,25 +692,26 @@ public class DeterminePartitionsJob implements Jobby
final DimValueCount firstDvc = iterator.next();
final long totalRows = firstDvc.numRows;
- if (!"".equals(firstDvc.dim) || !"".equals(firstDvc.value)) {
+ if (!firstDvc.dims.isEmpty() || firstDvc.values.size() != 0) {
throw new IllegalStateException("Expected total row indicator on first k/v pair");
}
// "iterator" will now take us over many candidate dimensions
DimPartitions currentDimPartitions = null;
DimPartition currentDimPartition = null;
- String currentDimPartitionStart = null;
+ StringTuple currentDimPartitionStart = null;
boolean currentDimSkip = false;
// We'll store possible partitions in here
- final Map<String, DimPartitions> dimPartitionss = new HashMap<>();
+ final Map<List<String>, DimPartitions> dimPartitionss = new HashMap<>();
+ final DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
while (iterator.hasNext()) {
final DimValueCount dvc = iterator.next();
- if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
+ if (currentDimPartitions == null || !currentDimPartitions.dims.equals(dvc.dims)) {
// Starting a new dimension! Exciting!
- currentDimPartitions = new DimPartitions(dvc.dim);
+ currentDimPartitions = new DimPartitions(dvc.dims);
currentDimPartition = new DimPartition();
currentDimPartitionStart = null;
currentDimSkip = false;
@@ -652,7 +719,7 @@ public class DeterminePartitionsJob implements Jobby
// Respect poisoning
if (!currentDimSkip && dvc.numRows < 0) {
- log.info("Cannot partition on multi-value dimension: %s", dvc.dim);
+ log.info("Cannot partition on multi-value dimension: %s", dvc.dims);
currentDimSkip = true;
}
@@ -662,10 +729,11 @@ public class DeterminePartitionsJob implements Jobby
// See if we need to cut a new partition ending immediately before this dimension value
if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) {
- final ShardSpec shardSpec = new SingleDimensionShardSpec(
- currentDimPartitions.dim,
+ final ShardSpec shardSpec = createShardSpec(
+ partitionsSpec instanceof SingleDimensionPartitionsSpec,
+ currentDimPartitions.dims,
currentDimPartitionStart,
- dvc.value,
+ dvc.values,
currentDimPartitions.partitions.size(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
// completeness. See SingleDimensionShardSpec.createChunk().
@@ -682,14 +750,14 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartition.shardSpec = shardSpec;
currentDimPartitions.partitions.add(currentDimPartition);
currentDimPartition = new DimPartition();
- currentDimPartitionStart = dvc.value;
+ currentDimPartitionStart = dvc.values;
}
// Update counters
currentDimPartition.cardinality++;
currentDimPartition.rows += dvc.numRows;
- if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
+ if (!iterator.hasNext() || !currentDimPartitions.dims.equals(iterator.peek().dims)) {
// Finalize the current dimension
if (currentDimPartition.rows > 0) {
@@ -703,11 +771,12 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartitions.partitions.size() - 1
);
- final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
+ final DimensionRangeShardSpec previousShardSpec = (DimensionRangeShardSpec) previousDimPartition.shardSpec;
- shardSpec = new SingleDimensionShardSpec(
- currentDimPartitions.dim,
- previousShardSpec.getStart(),
+ shardSpec = createShardSpec(
+ partitionsSpec instanceof SingleDimensionPartitionsSpec,
+ currentDimPartitions.dims,
+ previousShardSpec.getStartTuple(),
null,
previousShardSpec.getPartitionNum(),
// Set unknown core partitions size so that the legacy way is used for checking partitionHolder
@@ -721,8 +790,9 @@ public class DeterminePartitionsJob implements Jobby
currentDimPartition.cardinality += previousDimPartition.cardinality;
} else {
// Create new shard
- shardSpec = new SingleDimensionShardSpec(
- currentDimPartitions.dim,
+ shardSpec = createShardSpec(
+ partitionsSpec instanceof SingleDimensionPartitionsSpec,
+ currentDimPartitions.dims,
currentDimPartitionStart,
null,
currentDimPartitions.partitions.size(),
@@ -745,13 +815,13 @@ public class DeterminePartitionsJob implements Jobby
log.info(
"Completed dimension[%s]: %,d possible shards with %,d unique values",
- currentDimPartitions.dim,
+ currentDimPartitions.dims,
currentDimPartitions.partitions.size(),
currentDimPartitions.getCardinality()
);
// Add ourselves to the partitions map
- dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions);
+ dimPartitionss.put(currentDimPartitions.dims, currentDimPartitions);
}
}
@@ -769,7 +839,7 @@ public class DeterminePartitionsJob implements Jobby
if (dimPartitions.getRows() != totalRows) {
log.info(
"Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
- dimPartitions.dim,
+ dimPartitions.dims,
dimPartitions.getRows(),
totalRows
);
@@ -779,11 +849,9 @@ public class DeterminePartitionsJob implements Jobby
// Make sure none of these shards are oversized
boolean oversized = false;
- final SingleDimensionPartitionsSpec partitionsSpec =
- (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
for (final DimPartition partition : dimPartitions.partitions) {
if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) {
- log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
+ log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec);
oversized = true;
}
}
@@ -877,12 +945,12 @@ public class DeterminePartitionsJob implements Jobby
private static class DimPartitions
{
- public final String dim;
+ public final List<String> dims;
public final List<DimPartition> partitions = new ArrayList<>();
- private DimPartitions(String dim)
+ private DimPartitions(List<String> dims)
{
- this.dim = dim;
+ this.dims = dims;
}
int getCardinality()
@@ -923,32 +991,46 @@ public class DeterminePartitionsJob implements Jobby
public long rows = 0;
}
- private static class DimValueCount
+ public static class DimValueCount
{
- public final String dim;
- public final String value;
- public final long numRows;
-
- private DimValueCount(String dim, String value, long numRows)
+ @JsonProperty
+ private final List<String> dims;
+ @JsonProperty
+ private final StringTuple values;
+ @JsonProperty
+ private final long numRows;
+
+ @JsonCreator
+ public DimValueCount(
+ @JsonProperty("dims") List<String> dims,
+ @JsonProperty("values") StringTuple values,
+ @JsonProperty("numRows") long numRows
+ )
{
- this.dim = dim;
- this.value = value;
+ this.dims = dims;
+ this.values = values;
this.numRows = numRows;
}
- Text toText()
+ public Text toText()
{
- return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value));
+ try {
+ String jsonValue = HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(this);
+ return new Text(jsonValue);
+ }
+ catch (JsonProcessingException e) {
+ throw new ISE(e, "to json %s error", toString());
+ }
}
- static DimValueCount fromText(Text text)
+ public static DimValueCount fromText(Text text)
{
- final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator();
- final String dim = splits.next();
- final long numRows = Long.parseLong(splits.next());
- final String value = splits.next();
-
- return new DimValueCount(dim, value, numRows);
+ try {
+ return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(text.toString(), DimValueCount.class);
+ }
+ catch (IOException e) {
+ throw new ISE(e, "parse json %s error", text.toString());
+ }
}
}
@@ -959,8 +1041,10 @@ public class DeterminePartitionsJob implements Jobby
)
throws IOException, InterruptedException
{
- byte[] sortKey = TAB_JOINER.join(dimValueCount.dim, dimValueCount.value)
- .getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
+ byte[] sortKey = TAB_JOINER.join(
+ COMMA_JOINER.join(dimValueCount.dims),
+ COMMA_JOINER.join(dimValueCount.values.toArray())
+ ).getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
context.write(new SortableBytes(groupKey, sortKey).toBytesWritable(), dimValueCount.toText());
}
}
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index ea37db1a10..aae23e9c12 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -21,9 +21,9 @@ package org.apache.druid.indexer;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -114,7 +114,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
if (partitionsSpec instanceof HashedPartitionsSpec) {
return new DetermineHashedPartitionsJob(config);
- } else if (partitionsSpec instanceof SingleDimensionPartitionsSpec) {
+ } else if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
return new DeterminePartitionsJob(config);
} else {
throw new ISE("Unknown partitionsSpec[%s]", partitionsSpec);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
new file mode 100644
index 0000000000..83a9bd58e5
--- /dev/null
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+
+@RunWith(Parameterized.class)
+public class DetermineRangePartitionsJobTest
+{
+ private HadoopDruidIndexerConfig config;
+ private int expectedNumOfSegments;
+ private int[] expectedNumOfShardsForEachSegment;
+ private String[][][][] expectedStartEndForEachShard;
+ private File dataFile;
+ private File tmpDir;
+
+ @Parameterized.Parameters(name = "assumeGrouped={0}, "
+ + "targetPartitionSize={1}, "
+ + "interval={2}"
+ + "expectedNumOfSegments={3}, "
+ + "expectedNumOfShardsForEachSegment={4}, "
+ + "expectedStartEndForEachShard={5}, "
+ + "data={6}, "
+ + "partitionDimensions={7}")
+ public static Collection<Object[]> constructFeed()
+ {
+ return Arrays.asList(
+ new Object[][]{
+ {
+ false,
+ 3,
+ "2014-10-20T00:00:00Z/P1D",
+ 1,
+ new int[]{4},
+ new String[][][][]{
+ {
+ {null, {"d.example.com"}},
+ {{"d.example.com"}, {"g.example.com"}},
+ {{"g.example.com"}, {"j.example.com"}},
+ {{"j.example.com"}, null},
+ }
+ },
+ ImmutableList.of(
+ "2014102000,a.example.com,CN,100",
+ "2014102000,a.example.com,CN,100",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,c.example.com,US,200",
+ "2014102000,c.example.com,US,200",
+ "2014102000,d.example.com,US,250",
+ "2014102000,d.example.com,US,250",
+ "2014102000,e.example.com,US,123",
+ "2014102000,e.example.com,US,123",
+ "2014102000,f.example.com,US,567",
+ "2014102000,f.example.com,US,567",
+ "2014102000,g.example.com,US,11",
+ "2014102000,g.example.com,US,11",
+ "2014102000,h.example.com,US,251",
+ "2014102000,h.example.com,US,251",
+ "2014102000,i.example.com,US,963",
+ "2014102000,i.example.com,US,963",
+ "2014102000,j.example.com,US,333",
+ "2014102000,j.example.com,US,333"
+ ),
+ ImmutableList.of("host")
+ },
+ {
+ true,
+ 3,
+ "2014-10-22T00:00:00Z/P1D",
+ 1,
+ new int[]{4},
+ new String[][][][]{
+ {
+ {null, {"d.example.com", "US"}},
+ {{"d.example.com", "US"}, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, {"j.example.com", "US"}},
+ {{"j.example.com", "US"}, null}
+ }
+ },
+ ImmutableList.of(
+ "2014102200,a.example.com,CN,100",
+ "2014102200,b.exmaple.com,US,50",
+ "2014102200,c.example.com,US,200",
+ "2014102200,d.example.com,US,250",
+ "2014102200,e.example.com,US,123",
+ "2014102200,f.example.com,US,567",
+ "2014102200,g.example.com,US,11",
+ "2014102200,h.example.com,US,251",
+ "2014102200,i.example.com,US,963",
+ "2014102200,j.example.com,US,333"
+ ),
+ ImmutableList.of("host", "country")
+ },
+ {
+ false,
+ 3,
+ "2014-10-20T00:00:00Z/P1D",
+ 1,
+ new int[]{4},
+ new String[][][][]{
+ {
+ {null, {"d.example.com", "US"}},
+ {{"d.example.com", "US"}, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, {"j.example.com", "US"}},
+ {{"j.example.com", "US"}, null}
+ }
+ },
+ ImmutableList.of(
+ "2014102000,a.example.com,CN,100",
+ "2014102000,a.example.com,CN,100",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,c.example.com,US,200",
+ "2014102000,c.example.com,US,200",
+ "2014102000,d.example.com,US,250",
+ "2014102000,d.example.com,US,250",
+ "2014102000,e.example.com,US,123",
+ "2014102000,e.example.com,US,123",
+ "2014102000,f.example.com,US,567",
+ "2014102000,f.example.com,US,567",
+ "2014102000,g.example.com,US,11",
+ "2014102000,g.example.com,US,11",
+ "2014102000,h.example.com,US,251",
+ "2014102000,h.example.com,US,251",
+ "2014102000,i.example.com,US,963",
+ "2014102000,i.example.com,US,963",
+ "2014102000,j.example.com,US,333",
+ "2014102000,j.example.com,US,333"
+ ),
+ ImmutableList.of("host", "country")
+ },
+ {
+ true,
+ 6,
+ "2014-10-20T00:00:00Z/P3D",
+ 3,
+ new int[]{2, 2, 2},
+ new String[][][][]{
+ {
+ {null, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, null}
+ },
+ {
+ {null, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, null}
+ },
+ {
+ {null, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, null}
+ }
+ },
+ ImmutableList.of(
+ "2014102000,a.example.com,CN,100",
+ "2014102000,b.exmaple.com,CN,50",
+ "2014102000,c.example.com,CN,200",
+ "2014102000,d.example.com,US,250",
+ "2014102000,e.example.com,US,123",
+ "2014102000,f.example.com,US,567",
+ "2014102000,g.example.com,US,11",
+ "2014102000,h.example.com,US,251",
+ "2014102000,i.example.com,US,963",
+ "2014102000,j.example.com,US,333",
+ "2014102000,k.example.com,US,555",
+ "2014102100,a.example.com,CN,100",
+ "2014102100,b.exmaple.com,CN,50",
+ "2014102100,c.example.com,CN,200",
+ "2014102100,d.example.com,US,250",
+ "2014102100,e.example.com,US,123",
+ "2014102100,f.example.com,US,567",
+ "2014102100,g.example.com,US,11",
+ "2014102100,h.example.com,US,251",
+ "2014102100,i.example.com,US,963",
+ "2014102100,j.example.com,US,333",
+ "2014102100,k.example.com,US,555",
+ "2014102200,a.example.com,CN,100",
+ "2014102200,b.exmaple.com,CN,50",
+ "2014102200,c.example.com,CN,200",
+ "2014102200,d.example.com,US,250",
+ "2014102200,e.example.com,US,123",
+ "2014102200,f.example.com,US,567",
+ "2014102200,g.example.com,US,11",
+ "2014102200,h.example.com,US,251",
+ "2014102200,i.example.com,US,963",
+ "2014102200,j.example.com,US,333",
+ "2014102200,k.example.com,US,555"
+ ),
+ ImmutableList.of("host", "country")
+ },
+ {
+ false,
+ 2,
+ "2014-10-20T00:00:00Z/P1D",
+ 1,
+ new int[]{5},
+ new String[][][][]{
+ {
+ {null, {"c.example.com", null}},
+ {{"c.example.com", null}, {"e.example.com", "US"}},
+ {{"e.example.com", "US"}, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, {"i.example.com", null}},
+ {{"i.example.com", null}, null}
+ }
+ },
+ ImmutableList.of(
+ "2014102000,a.example.com,CN,100",
+ "2014102000,a.example.com,CN,100",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,c.example.com,,200",
+ "2014102000,c.example.com,,200",
+ "2014102000,d.example.com,US,250",
+ "2014102000,d.example.com,US,250",
+ "2014102000,e.example.com,US,123",
+ "2014102000,e.example.com,US,123",
+ "2014102000,f.example.com,US,567",
+ "2014102000,f.example.com,US,567",
+ "2014102000,g.example.com,US,11",
+ "2014102000,g.example.com,US,11",
+ "2014102000,h.example.com,US,251",
+ "2014102000,h.example.com,US,251",
+ "2014102000,i.example.com,,963",
+ "2014102000,i.example.com,,963",
+ "2014102000,j.example.com,US,333",
+ "2014102000,j.example.com,US,333"
+ ),
+ ImmutableList.of("host", "country")
+ },
+ {
+ true,
+ 2,
+ "2014-10-20T00:00:00Z/P1D",
+ 1,
+ new int[]{5},
+ new String[][][][]{
+ {
+ {null, {"c.example.com", null}},
+ {{"c.example.com", null}, {"e.example.com", "US"}},
+ {{"e.example.com", "US"}, {"g.example.com", "US"}},
+ {{"g.example.com", "US"}, {"i.example.com", null}},
+ {{"i.example.com", null}, null}
+ }
+ },
+ ImmutableList.of(
+ "2014102000,a.example.com,CN,100",
+ "2014102000,b.exmaple.com,US,50",
+ "2014102000,c.example.com,,200",
+ "2014102000,d.example.com,US,250",
+ "2014102000,e.example.com,US,123",
+ "2014102000,f.example.com,US,567",
+ "2014102000,g.example.com,US,11",
+ "2014102000,h.example.com,US,251",
+ "2014102000,i.example.com,,963",
+ "2014102000,j.example.com,US,333"
+ ),
+ ImmutableList.of("host", "country")
+ }
+ }
+ );
+ }
+
+ public DetermineRangePartitionsJobTest(
+ boolean assumeGrouped,
+ Integer targetPartitionSize,
+ String interval,
+ int expectedNumOfSegments,
+ int[] expectedNumOfShardsForEachSegment,
+ String[][][][] expectedStartEndForEachShard,
+ List<String> data,
+ List<String> partitionDimensions
+ ) throws IOException
+ {
+ this.expectedNumOfSegments = expectedNumOfSegments;
+ this.expectedNumOfShardsForEachSegment = expectedNumOfShardsForEachSegment;
+ this.expectedStartEndForEachShard = expectedStartEndForEachShard;
+
+ dataFile = Files.createTempFile("test_range_website_data", "tmp").toFile();
+ dataFile.deleteOnExit();
+ tmpDir = FileUtils.createTempDir();
+ tmpDir.deleteOnExit();
+
+ org.apache.commons.io.FileUtils.writeLines(dataFile, data);
+
+ config = new HadoopDruidIndexerConfig(
+ new HadoopIngestionSpec(
+ new DataSchema(
+ "website",
+ null,
+ null,
+ new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
+ new UniformGranularitySpec(
+ Granularities.DAY,
+ Granularities.NONE,
+ ImmutableList.of(Intervals.of(interval))
+ ),
+ null,
+ HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
+ new StringInputRowParser(
+ new CSVParseSpec(
+ new TimestampSpec("timestamp", "yyyyMMddHH", null),
+ new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country"))
+ ),
+ null,
+ ImmutableList.of("timestamp", "host", "country", "visited_num"),
+ false,
+ 0
+ ),
+ null
+ ),
+ Map.class
+ ),
+ HadoopDruidIndexerConfig.JSON_MAPPER
+ ),
+ new HadoopIOConfig(
+ ImmutableMap.of(
+ "paths",
+ dataFile.getCanonicalPath(),
+ "type",
+ "static"
+ ),
+ null,
+ tmpDir.getCanonicalPath()
+ ),
+ new HadoopTuningConfig(
+ tmpDir.getCanonicalPath(),
+ null,
+ new DimensionRangePartitionsSpec(
+ targetPartitionSize,
+ null,
+ partitionDimensions,
+ assumeGrouped
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false,
+ false,
+ false,
+ false,
+ null,
+ false,
+ false,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null,
+ null,
+ null
+ )
+ )
+ );
+ }
+
+ @Test
+ public void testPartitionJob()
+ {
+ DeterminePartitionsJob job = new DeterminePartitionsJob(config);
+ job.run();
+
+ int shardNum = 0;
+ int segmentNum = 0;
+ Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());
+
+ for (Map.Entry<Long, List<HadoopyShardSpec>> entry : config.getSchema()
+ .getTuningConfig()
+ .getShardSpecs()
+ .entrySet()) {
+ int partitionNum = 0;
+ List<HadoopyShardSpec> specs = entry.getValue();
+ Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size());
+
+ for (HadoopyShardSpec spec : specs) {
+ DimensionRangeShardSpec actualSpec = (DimensionRangeShardSpec) spec.getActualSpec();
+ Assert.assertEquals(shardNum, spec.getShardNum());
+ Assert.assertArrayEquals(
+ expectedStartEndForEachShard[segmentNum][partitionNum][0],
+ actualSpec.getStartTuple() == null ? null : actualSpec.getStartTuple().toArray()
+ );
+ Assert.assertArrayEquals(
+ expectedStartEndForEachShard[segmentNum][partitionNum][1],
+ actualSpec.getEndTuple() == null ? null : actualSpec.getEndTuple().toArray()
+ );
+ Assert.assertEquals(partitionNum, actualSpec.getPartitionNum());
+ shardNum++;
+ partitionNum++;
+ }
+
+ segmentNum++;
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ org.apache.commons.io.FileUtils.forceDelete(dataFile);
+ FileUtils.deleteDirectory(tmpDir);
+ }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
index b8ab40bb49..f16f362175 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import org.apache.druid.java.util.common.Pair;
@@ -96,6 +97,8 @@ public class ITHadoopIndexTest extends AbstractITBatchIndexTest
{new SingleDimensionPartitionsSpec(1000, null, null, false)},
{new SingleDimensionPartitionsSpec(1000, null, "page", false)},
{new SingleDimensionPartitionsSpec(1000, null, null, true)},
+ {new DimensionRangePartitionsSpec(1000, null, ImmutableList.of("page"), true)},
+ {new DimensionRangePartitionsSpec(1000, null, ImmutableList.of("page", "user"), false)},
//{new HashedPartitionsSpec(null, 3, null)} // this results in a bug where the segments have 0 rows
};
diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
index 9c9fe99e95..fa49a48bcf 100644
--- a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
@@ -136,6 +136,61 @@ public class DimensionRangeShardSpecTest
);
}
+ @Test
+ public void testShardSpecLookupWithNull()
+ {
+ setDimensions("dim1", "dim2");
+
+ final DimensionRangeShardSpec shard0 = new DimensionRangeShardSpec(
+ dimensions,
+ null,
+ StringTuple.create("India", null),
+ 1,
+ 1
+ );
+
+ final DimensionRangeShardSpec shard1 = new DimensionRangeShardSpec(
+ dimensions,
+ StringTuple.create("India", null),
+ StringTuple.create("Spain", "Valencia"),
+ 10,
+ 1
+ );
+
+ final DimensionRangeShardSpec shard2 = new DimensionRangeShardSpec(
+ dimensions,
+ StringTuple.create("Spain", "Valencia"),
+ StringTuple.create("Tokyo", null),
+ 10,
+ 1
+ );
+
+ final DimensionRangeShardSpec shard3 = new DimensionRangeShardSpec(
+ dimensions,
+ StringTuple.create("Tokyo", null),
+ null,
+ 100,
+ 1
+ );
+ final ShardSpecLookup lookup = shard0.getLookup(Arrays.asList(shard0, shard1, shard2, shard3));
+ final long timestamp = System.currentTimeMillis();
+
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Delhi")));
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Kolkata")));
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Japan", "Tokyo")));
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Spain", "Barcelona")));
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Bengaluru")));
+ Assert.assertEquals(shard2, lookup.getShardSpec(timestamp, createRow("Spain", "Valencia")));
+ Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("United Kingdom", "London")));
+
+ Assert.assertEquals(shard0, lookup.getShardSpec(timestamp, createRow(null, null)));
+ Assert.assertEquals(shard0, lookup.getShardSpec(timestamp, createRow(null, "Lyon")));
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", null)));
+ Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Spain", null)));
+ Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("Tokyo", null)));
+ Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("United Kingdom", null)));
+ }
+
@Test
public void testPossibleInDomain_withNullStart()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org