You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ab...@apache.org on 2023/02/23 06:08:09 UTC

[druid] branch master updated: Hadoop based batch ingestion support range partition (#13303)

This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 79f04e71a1 Hadoop based batch ingestion support range partition (#13303)
79f04e71a1 is described below

commit 79f04e71a1ef1a181dcf7f5a3d69ff76858446b9
Author: hqx871 <hq...@gmail.com>
AuthorDate: Thu Feb 23 14:08:03 2023 +0800

    Hadoop based batch ingestion support range partition (#13303)
    
    This pr implements range partitioning for hadoop-based ingestion. For detail about multi dimension range partition can be seen #11848.
---
 .../druid/indexer/DeterminePartitionsJob.java      | 228 +++++++----
 .../HadoopDruidDetermineConfigurationJob.java      |   4 +-
 .../indexer/DetermineRangePartitionsJobTest.java   | 450 +++++++++++++++++++++
 .../druid/tests/hadoop/ITHadoopIndexTest.java      |   3 +
 .../partition/DimensionRangeShardSpecTest.java     |  55 +++
 5 files changed, 666 insertions(+), 74 deletions(-)

diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
index ebe69cd15a..11839b6931 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java
@@ -19,10 +19,12 @@
 
 package org.apache.druid.indexer;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -33,12 +35,17 @@ import com.google.common.io.Closeables;
 import org.apache.druid.collections.CombiningIterable;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.Rows;
+import org.apache.druid.data.input.StringTuple;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
 import org.apache.druid.timeline.partition.ShardSpec;
 import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
 import org.apache.hadoop.conf.Configurable;
@@ -71,6 +78,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -95,7 +104,7 @@ public class DeterminePartitionsJob implements Jobby
   private static final Logger log = new Logger(DeterminePartitionsJob.class);
 
   private static final Joiner TAB_JOINER = HadoopDruidIndexerConfig.TAB_JOINER;
-  private static final Splitter TAB_SPLITTER = HadoopDruidIndexerConfig.TAB_SPLITTER;
+  private static final Joiner COMMA_JOINER = Joiner.on(",").useForNull("");
 
   private final HadoopDruidIndexerConfig config;
 
@@ -119,15 +128,15 @@ public class DeterminePartitionsJob implements Jobby
        * in the final segment.
        */
 
-      if (!(config.getPartitionsSpec() instanceof SingleDimensionPartitionsSpec)) {
+      if (!(config.getPartitionsSpec() instanceof DimensionRangePartitionsSpec)) {
         throw new ISE(
-            "DeterminePartitionsJob can only be run for SingleDimensionPartitionsSpec, partitionSpec found [%s]",
+            "DeterminePartitionsJob can only be run for DimensionRangePartitionsSpec, partitionSpec found [%s]",
             config.getPartitionsSpec()
         );
       }
 
-      final SingleDimensionPartitionsSpec partitionsSpec =
-          (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
+      final DimensionRangePartitionsSpec partitionsSpec =
+          (DimensionRangePartitionsSpec) config.getPartitionsSpec();
 
       if (!partitionsSpec.isAssumeGrouped()) {
         groupByJob = Job.getInstance(
@@ -382,8 +391,7 @@ public class DeterminePartitionsJob implements Jobby
     protected void setup(Context context)
     {
       final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
-      SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
-      helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+      helper = new DeterminePartitionsDimSelectionMapperHelper(config);
     }
 
     @Override
@@ -412,8 +420,7 @@ public class DeterminePartitionsJob implements Jobby
     {
       super.setup(context);
       final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
-      final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
-      helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+      helper = new DeterminePartitionsDimSelectionMapperHelper(config);
     }
 
     @Override
@@ -437,13 +444,25 @@ public class DeterminePartitionsJob implements Jobby
   static class DeterminePartitionsDimSelectionMapperHelper
   {
     private final HadoopDruidIndexerConfig config;
-    private final String partitionDimension;
+    private final List<List<String>> dimensionGroupingSet;
     private final Map<Long, Integer> intervalIndexes;
 
-    DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
+    DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config)
     {
       this.config = config;
-      this.partitionDimension = partitionDimension;
+
+      DimensionRangePartitionsSpec spec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
+      final DimensionsSpec dimensionsSpec = config.getSchema().getDataSchema().getDimensionsSpec();
+      this.dimensionGroupingSet = new ArrayList<>();
+      final List<String> partitionDimensions = spec.getPartitionDimensions();
+      //if the partitionDimensions is not set, we just try every dimension to find the best one.
+      if (partitionDimensions.isEmpty()) {
+        for (DimensionSchema dimensionSchema : dimensionsSpec.getDimensions()) {
+          dimensionGroupingSet.add(Collections.singletonList(dimensionSchema.getName()));
+        }
+      } else {
+        dimensionGroupingSet.add(partitionDimensions);
+      }
 
       final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
       int idx = 0;
@@ -476,22 +495,29 @@ public class DeterminePartitionsJob implements Jobby
       final byte[] groupKey = buf.array();
 
       // Emit row-counter value.
-      write(context, groupKey, new DimValueCount("", "", 1));
-
-      for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
-        final String dim = dimAndValues.getKey();
-
-        if (partitionDimension == null || partitionDimension.equals(dim)) {
-          final Iterable<String> dimValues = dimAndValues.getValue();
-
-          if (Iterables.size(dimValues) == 1) {
-            // Emit this value.
-            write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
+      write(context, groupKey, new DimValueCount(Collections.emptyList(), StringTuple.create(), 1));
+
+      Iterator<List<String>> dimensionGroupIterator = dimensionGroupingSet.iterator();
+      while (dimensionGroupIterator.hasNext()) {
+        List<String> dimensionGroup = dimensionGroupIterator.next();
+        String[] values = new String[dimensionGroup.size()];
+        int numRow = 1;
+        for (int i = 0; i < dimensionGroup.size(); i++) {
+          String dimension = dimensionGroup.get(i);
+          final Iterable<String> dimValues = dims.get(dimension);
+          if (dimValues != null && Iterables.size(dimValues) == 1) {
+            values[i] = Iterables.getOnlyElement(dimValues);
+          } else if (dimValues == null || Iterables.isEmpty(dimValues)) {
+            //just let values[i] be null when the dim value is null.
           } else {
             // This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
-            write(context, groupKey, new DimValueCount(dim, "", -1));
+            numRow = -1;
           }
         }
+        write(context, groupKey, new DimValueCount(dimensionGroup, StringTuple.create(values), numRow));
+        if (numRow == -1) {
+          dimensionGroupIterator.remove();
+        }
       }
     }
   }
@@ -572,12 +598,30 @@ public class DeterminePartitionsJob implements Jobby
 
     private static Iterable<DimValueCount> combineRows(Iterable<Text> input)
     {
+      final Comparator<List<String>> dimsComparator = new Comparator<List<String>>()
+      {
+        @Override
+        public int compare(List<String> o1, List<String> o2)
+        {
+          int len = Math.min(o1.size(), o2.size());
+          for (int i = 0; i < len; i++) {
+            int comparison = o1.get(i).compareTo(o2.get(i));
+            if (comparison != 0) {
+              return comparison;
+            }
+          }
+          return Integer.compare(o1.size(), o2.size());
+        }
+      };
       return new CombiningIterable<>(
           Iterables.transform(
               input,
               DimValueCount::fromText
           ),
-          (o1, o2) -> ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result(),
+          (o1, o2) -> ComparisonChain.start()
+                                     .compare(o1.dims, o2.dims, dimsComparator)
+                                     .compare(o1.values, o2.values)
+                                     .result(),
           (arg1, arg2) -> {
             if (arg2 == null) {
               return arg1;
@@ -585,7 +629,7 @@ public class DeterminePartitionsJob implements Jobby
 
             // Respect "poisoning" (negative values mean we can't use this dimension)
             final long newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
-            return new DimValueCount(arg1.dim, arg1.value, newNumRows);
+            return new DimValueCount(arg1.dims, arg1.values, newNumRows);
           }
       );
     }
@@ -603,6 +647,28 @@ public class DeterminePartitionsJob implements Jobby
     }
   }
 
+  static DimensionRangeShardSpec createShardSpec(
+      boolean isSingleDim,
+      List<String> dimensions,
+      @Nullable StringTuple start,
+      @Nullable StringTuple end,
+      int partitionNum,
+      @Nullable Integer numCorePartitions
+  )
+  {
+    if (isSingleDim) {
+      return new SingleDimensionShardSpec(
+          Iterables.getOnlyElement(dimensions),
+          start == null ? null : start.get(0),
+          end == null ? null : end.get(0),
+          partitionNum,
+          numCorePartitions
+      );
+    } else {
+      return new DimensionRangeShardSpec(dimensions, start, end, partitionNum, numCorePartitions);
+    }
+  }
+
   public static class DeterminePartitionsDimSelectionReducer extends DeterminePartitionsDimSelectionBaseReducer
   {
     private static final double SHARD_COMBINE_THRESHOLD = 0.25;
@@ -626,25 +692,26 @@ public class DeterminePartitionsJob implements Jobby
       final DimValueCount firstDvc = iterator.next();
       final long totalRows = firstDvc.numRows;
 
-      if (!"".equals(firstDvc.dim) || !"".equals(firstDvc.value)) {
+      if (!firstDvc.dims.isEmpty() || firstDvc.values.size() != 0) {
         throw new IllegalStateException("Expected total row indicator on first k/v pair");
       }
 
       // "iterator" will now take us over many candidate dimensions
       DimPartitions currentDimPartitions = null;
       DimPartition currentDimPartition = null;
-      String currentDimPartitionStart = null;
+      StringTuple currentDimPartitionStart = null;
       boolean currentDimSkip = false;
 
       // We'll store possible partitions in here
-      final Map<String, DimPartitions> dimPartitionss = new HashMap<>();
+      final Map<List<String>, DimPartitions> dimPartitionss = new HashMap<>();
+      final DimensionRangePartitionsSpec partitionsSpec = (DimensionRangePartitionsSpec) config.getPartitionsSpec();
 
       while (iterator.hasNext()) {
         final DimValueCount dvc = iterator.next();
 
-        if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
+        if (currentDimPartitions == null || !currentDimPartitions.dims.equals(dvc.dims)) {
           // Starting a new dimension! Exciting!
-          currentDimPartitions = new DimPartitions(dvc.dim);
+          currentDimPartitions = new DimPartitions(dvc.dims);
           currentDimPartition = new DimPartition();
           currentDimPartitionStart = null;
           currentDimSkip = false;
@@ -652,7 +719,7 @@ public class DeterminePartitionsJob implements Jobby
 
         // Respect poisoning
         if (!currentDimSkip && dvc.numRows < 0) {
-          log.info("Cannot partition on multi-value dimension: %s", dvc.dim);
+          log.info("Cannot partition on multi-value dimension: %s", dvc.dims);
           currentDimSkip = true;
         }
 
@@ -662,10 +729,11 @@ public class DeterminePartitionsJob implements Jobby
 
         // See if we need to cut a new partition ending immediately before this dimension value
         if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows > config.getTargetPartitionSize()) {
-          final ShardSpec shardSpec = new SingleDimensionShardSpec(
-              currentDimPartitions.dim,
+          final ShardSpec shardSpec = createShardSpec(
+              partitionsSpec instanceof SingleDimensionPartitionsSpec,
+              currentDimPartitions.dims,
               currentDimPartitionStart,
-              dvc.value,
+              dvc.values,
               currentDimPartitions.partitions.size(),
               // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
               // completeness. See SingleDimensionShardSpec.createChunk().
@@ -682,14 +750,14 @@ public class DeterminePartitionsJob implements Jobby
           currentDimPartition.shardSpec = shardSpec;
           currentDimPartitions.partitions.add(currentDimPartition);
           currentDimPartition = new DimPartition();
-          currentDimPartitionStart = dvc.value;
+          currentDimPartitionStart = dvc.values;
         }
 
         // Update counters
         currentDimPartition.cardinality++;
         currentDimPartition.rows += dvc.numRows;
 
-        if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
+        if (!iterator.hasNext() || !currentDimPartitions.dims.equals(iterator.peek().dims)) {
           // Finalize the current dimension
 
           if (currentDimPartition.rows > 0) {
@@ -703,11 +771,12 @@ public class DeterminePartitionsJob implements Jobby
                   currentDimPartitions.partitions.size() - 1
               );
 
-              final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
+              final DimensionRangeShardSpec previousShardSpec = (DimensionRangeShardSpec) previousDimPartition.shardSpec;
 
-              shardSpec = new SingleDimensionShardSpec(
-                  currentDimPartitions.dim,
-                  previousShardSpec.getStart(),
+              shardSpec = createShardSpec(
+                  partitionsSpec instanceof SingleDimensionPartitionsSpec,
+                  currentDimPartitions.dims,
+                  previousShardSpec.getStartTuple(),
                   null,
                   previousShardSpec.getPartitionNum(),
                   // Set unknown core partitions size so that the legacy way is used for checking partitionHolder
@@ -721,8 +790,9 @@ public class DeterminePartitionsJob implements Jobby
               currentDimPartition.cardinality += previousDimPartition.cardinality;
             } else {
               // Create new shard
-              shardSpec = new SingleDimensionShardSpec(
-                  currentDimPartitions.dim,
+              shardSpec = createShardSpec(
+                  partitionsSpec instanceof SingleDimensionPartitionsSpec,
+                  currentDimPartitions.dims,
                   currentDimPartitionStart,
                   null,
                   currentDimPartitions.partitions.size(),
@@ -745,13 +815,13 @@ public class DeterminePartitionsJob implements Jobby
 
           log.info(
               "Completed dimension[%s]: %,d possible shards with %,d unique values",
-              currentDimPartitions.dim,
+              currentDimPartitions.dims,
               currentDimPartitions.partitions.size(),
               currentDimPartitions.getCardinality()
           );
 
           // Add ourselves to the partitions map
-          dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions);
+          dimPartitionss.put(currentDimPartitions.dims, currentDimPartitions);
         }
       }
 
@@ -769,7 +839,7 @@ public class DeterminePartitionsJob implements Jobby
         if (dimPartitions.getRows() != totalRows) {
           log.info(
               "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
-              dimPartitions.dim,
+              dimPartitions.dims,
               dimPartitions.getRows(),
               totalRows
           );
@@ -779,11 +849,9 @@ public class DeterminePartitionsJob implements Jobby
 
         // Make sure none of these shards are oversized
         boolean oversized = false;
-        final SingleDimensionPartitionsSpec partitionsSpec =
-            (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
         for (final DimPartition partition : dimPartitions.partitions) {
           if (partition.rows > partitionsSpec.getMaxRowsPerSegment()) {
-            log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
+            log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec);
             oversized = true;
           }
         }
@@ -877,12 +945,12 @@ public class DeterminePartitionsJob implements Jobby
 
   private static class DimPartitions
   {
-    public final String dim;
+    public final List<String> dims;
     public final List<DimPartition> partitions = new ArrayList<>();
 
-    private DimPartitions(String dim)
+    private DimPartitions(List<String> dims)
     {
-      this.dim = dim;
+      this.dims = dims;
     }
 
     int getCardinality()
@@ -923,32 +991,46 @@ public class DeterminePartitionsJob implements Jobby
     public long rows = 0;
   }
 
-  private static class DimValueCount
+  public static class DimValueCount
   {
-    public final String dim;
-    public final String value;
-    public final long numRows;
-
-    private DimValueCount(String dim, String value, long numRows)
+    @JsonProperty
+    private final List<String> dims;
+    @JsonProperty
+    private final StringTuple values;
+    @JsonProperty
+    private final long numRows;
+
+    @JsonCreator
+    public DimValueCount(
+        @JsonProperty("dims") List<String> dims,
+        @JsonProperty("values") StringTuple values,
+        @JsonProperty("numRows") long numRows
+    )
     {
-      this.dim = dim;
-      this.value = value;
+      this.dims = dims;
+      this.values = values;
       this.numRows = numRows;
     }
 
-    Text toText()
+    public Text toText()
     {
-      return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value));
+      try {
+        String jsonValue = HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(this);
+        return new Text(jsonValue);
+      }
+      catch (JsonProcessingException e) {
+        throw new ISE(e, "to json %s error", toString());
+      }
     }
 
-    static DimValueCount fromText(Text text)
+    public static DimValueCount fromText(Text text)
     {
-      final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator();
-      final String dim = splits.next();
-      final long numRows = Long.parseLong(splits.next());
-      final String value = splits.next();
-
-      return new DimValueCount(dim, value, numRows);
+      try {
+        return HadoopDruidIndexerConfig.JSON_MAPPER.readValue(text.toString(), DimValueCount.class);
+      }
+      catch (IOException e) {
+        throw new ISE(e, "parse json %s error", text.toString());
+      }
     }
   }
 
@@ -959,8 +1041,10 @@ public class DeterminePartitionsJob implements Jobby
   )
       throws IOException, InterruptedException
   {
-    byte[] sortKey = TAB_JOINER.join(dimValueCount.dim, dimValueCount.value)
-                               .getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
+    byte[] sortKey = TAB_JOINER.join(
+        COMMA_JOINER.join(dimValueCount.dims),
+        COMMA_JOINER.join(dimValueCount.values.toArray())
+    ).getBytes(HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET);
     context.write(new SortableBytes(groupKey, sortKey).toBytesWritable(), dimValueCount.toText());
   }
 }
diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
index ea37db1a10..aae23e9c12 100644
--- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
+++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidDetermineConfigurationJob.java
@@ -21,9 +21,9 @@ package org.apache.druid.indexer;
 
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -114,7 +114,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
     final PartitionsSpec partitionsSpec = config.getPartitionsSpec();
     if (partitionsSpec instanceof HashedPartitionsSpec) {
       return new DetermineHashedPartitionsJob(config);
-    } else if (partitionsSpec instanceof SingleDimensionPartitionsSpec) {
+    } else if (partitionsSpec instanceof DimensionRangePartitionsSpec) {
       return new DeterminePartitionsJob(config);
     } else {
       throw new ISE("Unknown partitionsSpec[%s]", partitionsSpec);
diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
new file mode 100644
index 0000000000..83a9bd58e5
--- /dev/null
+++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineRangePartitionsJobTest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.data.input.impl.CSVParseSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+
+@RunWith(Parameterized.class)
+public class DetermineRangePartitionsJobTest
+{
+  private HadoopDruidIndexerConfig config;
+  private int expectedNumOfSegments;
+  private int[] expectedNumOfShardsForEachSegment;
+  private String[][][][] expectedStartEndForEachShard;
+  private File dataFile;
+  private File tmpDir;
+
+  @Parameterized.Parameters(name = "assumeGrouped={0}, "
+                                   + "targetPartitionSize={1}, "
+                                   + "interval={2}"
+                                   + "expectedNumOfSegments={3}, "
+                                   + "expectedNumOfShardsForEachSegment={4}, "
+                                   + "expectedStartEndForEachShard={5}, "
+                                   + "data={6}, "
+                                   + "partitionDimensions={7}")
+  public static Collection<Object[]> constructFeed()
+  {
+    return Arrays.asList(
+        new Object[][]{
+            {
+                false,
+                3,
+                "2014-10-20T00:00:00Z/P1D",
+                1,
+                new int[]{4},
+                new String[][][][]{
+                    {
+                        {null, {"d.example.com"}},
+                        {{"d.example.com"}, {"g.example.com"}},
+                        {{"g.example.com"}, {"j.example.com"}},
+                        {{"j.example.com"}, null},
+                        }
+                },
+                ImmutableList.of(
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,c.example.com,US,200",
+                    "2014102000,c.example.com,US,200",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,i.example.com,US,963",
+                    "2014102000,i.example.com,US,963",
+                    "2014102000,j.example.com,US,333",
+                    "2014102000,j.example.com,US,333"
+                ),
+                ImmutableList.of("host")
+            },
+            {
+                true,
+                3,
+                "2014-10-22T00:00:00Z/P1D",
+                1,
+                new int[]{4},
+                new String[][][][]{
+                    {
+                        {null, {"d.example.com", "US"}},
+                        {{"d.example.com", "US"}, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, {"j.example.com", "US"}},
+                        {{"j.example.com", "US"}, null}
+                    }
+                },
+                ImmutableList.of(
+                    "2014102200,a.example.com,CN,100",
+                    "2014102200,b.exmaple.com,US,50",
+                    "2014102200,c.example.com,US,200",
+                    "2014102200,d.example.com,US,250",
+                    "2014102200,e.example.com,US,123",
+                    "2014102200,f.example.com,US,567",
+                    "2014102200,g.example.com,US,11",
+                    "2014102200,h.example.com,US,251",
+                    "2014102200,i.example.com,US,963",
+                    "2014102200,j.example.com,US,333"
+                ),
+                ImmutableList.of("host", "country")
+            },
+            {
+                false,
+                3,
+                "2014-10-20T00:00:00Z/P1D",
+                1,
+                new int[]{4},
+                new String[][][][]{
+                    {
+                        {null, {"d.example.com", "US"}},
+                        {{"d.example.com", "US"}, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, {"j.example.com", "US"}},
+                        {{"j.example.com", "US"}, null}
+                    }
+                },
+                ImmutableList.of(
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,c.example.com,US,200",
+                    "2014102000,c.example.com,US,200",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,i.example.com,US,963",
+                    "2014102000,i.example.com,US,963",
+                    "2014102000,j.example.com,US,333",
+                    "2014102000,j.example.com,US,333"
+                ),
+                ImmutableList.of("host", "country")
+            },
+            {
+                true,
+                6,
+                "2014-10-20T00:00:00Z/P3D",
+                3,
+                new int[]{2, 2, 2},
+                new String[][][][]{
+                    {
+                        {null, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, null}
+                    },
+                    {
+                        {null, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, null}
+                    },
+                    {
+                        {null, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, null}
+                    }
+                },
+                ImmutableList.of(
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,b.exmaple.com,CN,50",
+                    "2014102000,c.example.com,CN,200",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,i.example.com,US,963",
+                    "2014102000,j.example.com,US,333",
+                    "2014102000,k.example.com,US,555",
+                    "2014102100,a.example.com,CN,100",
+                    "2014102100,b.exmaple.com,CN,50",
+                    "2014102100,c.example.com,CN,200",
+                    "2014102100,d.example.com,US,250",
+                    "2014102100,e.example.com,US,123",
+                    "2014102100,f.example.com,US,567",
+                    "2014102100,g.example.com,US,11",
+                    "2014102100,h.example.com,US,251",
+                    "2014102100,i.example.com,US,963",
+                    "2014102100,j.example.com,US,333",
+                    "2014102100,k.example.com,US,555",
+                    "2014102200,a.example.com,CN,100",
+                    "2014102200,b.exmaple.com,CN,50",
+                    "2014102200,c.example.com,CN,200",
+                    "2014102200,d.example.com,US,250",
+                    "2014102200,e.example.com,US,123",
+                    "2014102200,f.example.com,US,567",
+                    "2014102200,g.example.com,US,11",
+                    "2014102200,h.example.com,US,251",
+                    "2014102200,i.example.com,US,963",
+                    "2014102200,j.example.com,US,333",
+                    "2014102200,k.example.com,US,555"
+                ),
+                ImmutableList.of("host", "country")
+            },
+            {
+                false,
+                2,
+                "2014-10-20T00:00:00Z/P1D",
+                1,
+                new int[]{5},
+                new String[][][][]{
+                    {
+                        {null, {"c.example.com", null}},
+                        {{"c.example.com", null}, {"e.example.com", "US"}},
+                        {{"e.example.com", "US"}, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, {"i.example.com", null}},
+                        {{"i.example.com", null}, null}
+                    }
+                },
+                ImmutableList.of(
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,c.example.com,,200",
+                    "2014102000,c.example.com,,200",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,i.example.com,,963",
+                    "2014102000,i.example.com,,963",
+                    "2014102000,j.example.com,US,333",
+                    "2014102000,j.example.com,US,333"
+                ),
+                ImmutableList.of("host", "country")
+            },
+            {
+                true,
+                2,
+                "2014-10-20T00:00:00Z/P1D",
+                1,
+                new int[]{5},
+                new String[][][][]{
+                    {
+                        {null, {"c.example.com", null}},
+                        {{"c.example.com", null}, {"e.example.com", "US"}},
+                        {{"e.example.com", "US"}, {"g.example.com", "US"}},
+                        {{"g.example.com", "US"}, {"i.example.com", null}},
+                        {{"i.example.com", null}, null}
+                    }
+                },
+                ImmutableList.of(
+                    "2014102000,a.example.com,CN,100",
+                    "2014102000,b.exmaple.com,US,50",
+                    "2014102000,c.example.com,,200",
+                    "2014102000,d.example.com,US,250",
+                    "2014102000,e.example.com,US,123",
+                    "2014102000,f.example.com,US,567",
+                    "2014102000,g.example.com,US,11",
+                    "2014102000,h.example.com,US,251",
+                    "2014102000,i.example.com,,963",
+                    "2014102000,j.example.com,US,333"
+                ),
+                ImmutableList.of("host", "country")
+            }
+        }
+    );
+  }
+
+  public DetermineRangePartitionsJobTest(
+      boolean assumeGrouped,
+      Integer targetPartitionSize,
+      String interval,
+      int expectedNumOfSegments,
+      int[] expectedNumOfShardsForEachSegment,
+      String[][][][] expectedStartEndForEachShard,
+      List<String> data,
+      List<String> partitionDimensions
+  ) throws IOException
+  {
+    this.expectedNumOfSegments = expectedNumOfSegments;
+    this.expectedNumOfShardsForEachSegment = expectedNumOfShardsForEachSegment;
+    this.expectedStartEndForEachShard = expectedStartEndForEachShard;
+
+    dataFile = Files.createTempFile("test_range_website_data", "tmp").toFile();
+    dataFile.deleteOnExit();
+    tmpDir = FileUtils.createTempDir();
+    tmpDir.deleteOnExit();
+
+    org.apache.commons.io.FileUtils.writeLines(dataFile, data);
+
+    config = new HadoopDruidIndexerConfig(
+        new HadoopIngestionSpec(
+            new DataSchema(
+                "website",
+                null,
+                null,
+                new AggregatorFactory[]{new LongSumAggregatorFactory("visited_num", "visited_num")},
+                new UniformGranularitySpec(
+                    Granularities.DAY,
+                    Granularities.NONE,
+                    ImmutableList.of(Intervals.of(interval))
+                ),
+                null,
+                HadoopDruidIndexerConfig.JSON_MAPPER.convertValue(
+                    new StringInputRowParser(
+                        new CSVParseSpec(
+                            new TimestampSpec("timestamp", "yyyyMMddHH", null),
+                            new DimensionsSpec(
+                                DimensionsSpec.getDefaultSchemas(ImmutableList.of("host", "country"))
+                            ),
+                            null,
+                            ImmutableList.of("timestamp", "host", "country", "visited_num"),
+                            false,
+                            0
+                        ),
+                        null
+                    ),
+                    Map.class
+                ),
+                HadoopDruidIndexerConfig.JSON_MAPPER
+            ),
+            new HadoopIOConfig(
+                ImmutableMap.of(
+                    "paths",
+                    dataFile.getCanonicalPath(),
+                    "type",
+                    "static"
+                ),
+                null,
+                tmpDir.getCanonicalPath()
+            ),
+            new HadoopTuningConfig(
+                tmpDir.getCanonicalPath(),
+                null,
+                new DimensionRangePartitionsSpec(
+                    targetPartitionSize,
+                    null,
+                    partitionDimensions,
+                    assumeGrouped
+                ),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                false,
+                false,
+                false,
+                false,
+                null,
+                false,
+                false,
+                null,
+                null,
+                false,
+                false,
+                null,
+                null,
+                null,
+                null,
+                null
+            )
+        )
+    );
+  }
+
+  @Test
+  public void testPartitionJob()
+  {
+    DeterminePartitionsJob job = new DeterminePartitionsJob(config);
+    job.run();
+
+    int shardNum = 0;
+    int segmentNum = 0;
+    Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());
+
+    for (Map.Entry<Long, List<HadoopyShardSpec>> entry : config.getSchema()
+                                                               .getTuningConfig()
+                                                               .getShardSpecs()
+                                                               .entrySet()) {
+      int partitionNum = 0;
+      List<HadoopyShardSpec> specs = entry.getValue();
+      Assert.assertEquals(expectedNumOfShardsForEachSegment[segmentNum], specs.size());
+
+      for (HadoopyShardSpec spec : specs) {
+        DimensionRangeShardSpec actualSpec = (DimensionRangeShardSpec) spec.getActualSpec();
+        Assert.assertEquals(shardNum, spec.getShardNum());
+        Assert.assertArrayEquals(
+            expectedStartEndForEachShard[segmentNum][partitionNum][0],
+            actualSpec.getStartTuple() == null ? null : actualSpec.getStartTuple().toArray()
+        );
+        Assert.assertArrayEquals(
+            expectedStartEndForEachShard[segmentNum][partitionNum][1],
+            actualSpec.getEndTuple() == null ? null : actualSpec.getEndTuple().toArray()
+        );
+        Assert.assertEquals(partitionNum, actualSpec.getPartitionNum());
+        shardNum++;
+        partitionNum++;
+      }
+
+      segmentNum++;
+    }
+  }
+
+  @After
+  public void tearDown() throws Exception
+  {
+    org.apache.commons.io.FileUtils.forceDelete(dataFile);
+    FileUtils.deleteDirectory(tmpDir);
+  }
+}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
index b8ab40bb49..f16f362175 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/hadoop/ITHadoopIndexTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
 import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
+import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
 import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
 import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import org.apache.druid.java.util.common.Pair;
@@ -96,6 +97,8 @@ public class ITHadoopIndexTest extends AbstractITBatchIndexTest
         {new SingleDimensionPartitionsSpec(1000, null, null, false)},
         {new SingleDimensionPartitionsSpec(1000, null, "page", false)},
         {new SingleDimensionPartitionsSpec(1000, null, null, true)},
+        {new DimensionRangePartitionsSpec(1000, null, ImmutableList.of("page"), true)},
+        {new DimensionRangePartitionsSpec(1000, null, ImmutableList.of("page", "user"), false)},
 
         //{new HashedPartitionsSpec(null, 3, null)} // this results in a bug where the segments have 0 rows
     };
diff --git a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
index 9c9fe99e95..fa49a48bcf 100644
--- a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
+++ b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionRangeShardSpecTest.java
@@ -136,6 +136,61 @@ public class DimensionRangeShardSpecTest
     );
   }
 
+  @Test
+  public void testShardSpecLookupWithNull()
+  {
+    setDimensions("dim1", "dim2");
+
+    final DimensionRangeShardSpec shard0 = new DimensionRangeShardSpec(
+        dimensions,
+        null,
+        StringTuple.create("India", null),
+        1,
+        1
+    );
+
+    final DimensionRangeShardSpec shard1 = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("India", null),
+        StringTuple.create("Spain", "Valencia"),
+        10,
+        1
+    );
+
+    final DimensionRangeShardSpec shard2 = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("Spain", "Valencia"),
+        StringTuple.create("Tokyo", null),
+        10,
+        1
+    );
+
+    final DimensionRangeShardSpec shard3 = new DimensionRangeShardSpec(
+        dimensions,
+        StringTuple.create("Tokyo", null),
+        null,
+        100,
+        1
+    );
+    final ShardSpecLookup lookup = shard0.getLookup(Arrays.asList(shard0, shard1, shard2, shard3));
+    final long timestamp = System.currentTimeMillis();
+
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Delhi")));
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Kolkata")));
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Japan", "Tokyo")));
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Spain", "Barcelona")));
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", "Bengaluru")));
+    Assert.assertEquals(shard2, lookup.getShardSpec(timestamp, createRow("Spain", "Valencia")));
+    Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("United Kingdom", "London")));
+
+    Assert.assertEquals(shard0, lookup.getShardSpec(timestamp, createRow(null, null)));
+    Assert.assertEquals(shard0, lookup.getShardSpec(timestamp, createRow(null, "Lyon")));
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("India", null)));
+    Assert.assertEquals(shard1, lookup.getShardSpec(timestamp, createRow("Spain", null)));
+    Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("Tokyo", null)));
+    Assert.assertEquals(shard3, lookup.getShardSpec(timestamp, createRow("United Kingdom", null)));
+  }
+
   @Test
   public void testPossibleInDomain_withNullStart()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org