You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/27 21:39:56 UTC

[GitHub] jihoonson closed pull request #5734: Multiple dimension partitioning spec

jihoonson closed pull request #5734: Multiple dimension partitioning spec
URL: https://github.com/apache/incubator-druid/pull/5734
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/src/main/java/io/druid/timeline/partition/RangePartitionChunk.java b/common/src/main/java/io/druid/timeline/partition/RangePartitionChunk.java
new file mode 100644
index 00000000000..92af8bdd1c2
--- /dev/null
+++ b/common/src/main/java/io/druid/timeline/partition/RangePartitionChunk.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.timeline.partition;
+
+import com.google.common.collect.Range;
+
+/**
+ */
+public class RangePartitionChunk<T> implements PartitionChunk<T>
+{
+  private final Range range;
+  private final int chunkNumber;
+  private final T object;
+
+  public RangePartitionChunk(
+      Range range,
+      int chunkNumber,
+      T object
+  )
+  {
+    this.range = range;
+    this.chunkNumber = chunkNumber;
+    this.object = object;
+  }
+
+  @Override
+  public T getObject()
+  {
+    return object;
+  }
+
+  @Override
+  public boolean abuts(PartitionChunk<T> chunk)
+  {
+    if (chunk instanceof RangePartitionChunk) {
+      RangePartitionChunk<T> rangeChunk = (RangePartitionChunk<T>) chunk;
+
+      return !rangeChunk.isStart() &&
+          range.isConnected(rangeChunk.range) &&
+          range.intersection(rangeChunk.range).isEmpty();
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isStart()
+  {
+    return !range.hasLowerBound();
+  }
+
+  @Override
+
+  public boolean isEnd()
+  {
+    return !range.hasUpperBound();
+  }
+
+  @Override
+  public int getChunkNumber()
+  {
+    return chunkNumber;
+  }
+
+  @Override
+  public int compareTo(PartitionChunk<T> chunk)
+  {
+    if (chunk instanceof RangePartitionChunk) {
+      RangePartitionChunk<T> stringChunk = (RangePartitionChunk<T>) chunk;
+
+      return Integer.compare(chunkNumber, stringChunk.chunkNumber);
+    }
+    throw new IllegalArgumentException("Cannot compare against something that is not a StringPartitionChunk.");
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    return compareTo((RangePartitionChunk<T>) o) == 0;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int result = range != null ? range.hashCode() : 0;
+    result = 31 * result + (object != null ? object.hashCode() : 0);
+    return result;
+  }
+}
diff --git a/common/src/test/java/io/druid/timeline/partition/RangePartitionChunkTest.java b/common/src/test/java/io/druid/timeline/partition/RangePartitionChunkTest.java
new file mode 100644
index 00000000000..8f87273d95f
--- /dev/null
+++ b/common/src/test/java/io/druid/timeline/partition/RangePartitionChunkTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.timeline.partition;
+
+import com.google.common.collect.Range;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ */
+public class RangePartitionChunkTest
+{
+  @Test
+  public void testAbuts()
+  {
+    RangePartitionChunk<Integer> lhs = new RangePartitionChunk(Range.lessThan("10"), 0, 1);
+
+    Assert.assertTrue(lhs.abuts(new RangePartitionChunk(Range.atLeast("10"), 1, 2)));
+    Assert.assertFalse(lhs.abuts(new RangePartitionChunk(Range.atLeast("11"), 2, 3)));
+    Assert.assertFalse(lhs.abuts(new RangePartitionChunk(Range.all(), 3, 4)));
+
+    Assert.assertFalse(new RangePartitionChunk(Range.all(), 0, 1).abuts(new RangePartitionChunk(Range.all(), 1, 2)));
+  }
+
+  @Test
+  public void testIsStart()
+  {
+    Assert.assertTrue(new RangePartitionChunk(Range.lessThan("10"), 0, 1).isStart());
+    Assert.assertFalse(new RangePartitionChunk(Range.atLeast("10"), 0, 1).isStart());
+    Assert.assertFalse(new RangePartitionChunk(Range.closedOpen("10", "11"), 0, 1).isStart());
+    Assert.assertTrue(new RangePartitionChunk(Range.all(), 0, 1).isStart());
+  }
+
+  @Test
+  public void testIsEnd()
+  {
+    Assert.assertFalse(new RangePartitionChunk(Range.lessThan("10"), 0, 1).isEnd());
+    Assert.assertTrue(new RangePartitionChunk(Range.atLeast("10"), 0, 1).isEnd());
+    Assert.assertFalse(new RangePartitionChunk(Range.closedOpen("10", "11"), 0, 1).isEnd());
+    Assert.assertTrue(new RangePartitionChunk(Range.all(), 0, 1).isEnd());
+  }
+
+  @Test
+  public void testCompareTo()
+  {
+    Assert.assertEquals(0, new RangePartitionChunk(Range.all(), 0, 1).compareTo(new RangePartitionChunk(Range.all(), 0, 2)));
+    Assert.assertEquals(0, new RangePartitionChunk(Range.atLeast("10"), 0, 1).compareTo(new RangePartitionChunk(Range.atLeast("10"), 0, 2)));
+    Assert.assertEquals(0, new RangePartitionChunk(Range.lessThan("10"), 1, 1).compareTo(new RangePartitionChunk(Range.lessThan("10"), 1, 2)));
+    Assert.assertEquals(0, new RangePartitionChunk(Range.closedOpen("10", "11"), 1, 1).compareTo(new RangePartitionChunk(Range.closedOpen("10", "11"), 1, 2)));
+    Assert.assertEquals(-1, new RangePartitionChunk(Range.atLeast("10"), 0, 1).compareTo(new RangePartitionChunk(Range.greaterThan("10"), 1, 2)));
+    Assert.assertEquals(-1, new RangePartitionChunk(Range.closedOpen("11", "20"), 0, 1).compareTo(new RangePartitionChunk(Range.closedOpen("20", "33"), 1, 1)));
+    Assert.assertEquals(1, new RangePartitionChunk(Range.closedOpen("20", "33"), 1, 1).compareTo(new RangePartitionChunk(Range.closedOpen("11", "20"), 0, 1)));
+    Assert.assertEquals(1, new RangePartitionChunk(Range.atLeast("10"), 1, 1).compareTo(new RangePartitionChunk(Range.lessThan("10"), 0, 1)));
+  }
+
+  @Test
+  public void testEquals()
+  {
+    Assert.assertEquals(new RangePartitionChunk(Range.all(), 0, 1), new RangePartitionChunk(Range.all(), 0, 1));
+    Assert.assertEquals(new RangePartitionChunk(Range.lessThan("10"), 0, 1), new RangePartitionChunk(Range.lessThan("10"), 0, 1));
+    Assert.assertEquals(new RangePartitionChunk(Range.atLeast("10"), 0, 1), new RangePartitionChunk(Range.atLeast("`0"), 0, 1));
+    Assert.assertEquals(new RangePartitionChunk(Range.closedOpen("10", "11"), 0, 1), new RangePartitionChunk(Range.closedOpen("10", "11"), 0, 1));
+  }
+}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
index 8052469daa2..75af737cf91 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/DeterminePartitionsJob.java
@@ -20,32 +20,32 @@
 package io.druid.indexer;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Splitter;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ComparisonChain;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.PeekingIterator;
+import com.google.common.collect.Range;
 import com.google.common.io.Closeables;
 import io.druid.collections.CombiningIterable;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.Rows;
+import io.druid.indexer.partitions.DimensionPartitionsSpec;
 import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.granularity.Granularity;
-import io.druid.java.util.common.guava.nary.BinaryFn;
 import io.druid.java.util.common.logger.Logger;
+import io.druid.timeline.partition.MultipleDimensionShardSpec;
 import io.druid.timeline.partition.NoneShardSpec;
 import io.druid.timeline.partition.ShardSpec;
-import io.druid.timeline.partition.SingleDimensionShardSpec;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -75,7 +75,8 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import java.util.Comparator;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -99,6 +100,8 @@
 
   private static final Joiner TAB_JOINER = HadoopDruidIndexerConfig.TAB_JOINER;
   private static final Splitter TAB_SPLITTER = HadoopDruidIndexerConfig.TAB_SPLITTER;
+  public static final Joiner COMMA_JOINER = Joiner.on(",").useForNull("");
+  public static final Splitter COMMA_SPLITTER = Splitter.on(",");
 
   private final HadoopDruidIndexerConfig config;
 
@@ -357,8 +360,8 @@ protected void reduce(
     protected void setup(Context context)
     {
       final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
-      SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
-      helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+      DimensionPartitionsSpec spec = (DimensionPartitionsSpec) config.getPartitionsSpec();
+      helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimensions());
     }
 
     @Override
@@ -389,8 +392,8 @@ protected void setup(Context context)
     {
       super.setup(context);
       final HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
-      final SingleDimensionPartitionsSpec spec = (SingleDimensionPartitionsSpec) config.getPartitionsSpec();
-      helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimension());
+      final DimensionPartitionsSpec spec = (DimensionPartitionsSpec) config.getPartitionsSpec();
+      helper = new DeterminePartitionsDimSelectionMapperHelper(config, spec.getPartitionDimensions());
     }
 
     @Override
@@ -415,13 +418,13 @@ protected void innerMap(
   public static class DeterminePartitionsDimSelectionMapperHelper
   {
     private final HadoopDruidIndexerConfig config;
-    private final String partitionDimension;
+    private final List<String> partitionDimensions;
     private final Map<Long, Integer> intervalIndexes;
 
-    public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
+    public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, List<String> partitionDimensions)
     {
       this.config = config;
-      this.partitionDimension = partitionDimension;
+      this.partitionDimensions = partitionDimensions;
 
       final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
       int idx = 0;
@@ -454,22 +457,45 @@ public void emitDimValueCounts(
       final byte[] groupKey = buf.array();
 
       // Emit row-counter value.
-      write(context, groupKey, new DimValueCount("", "", 1));
-
-      for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
-        final String dim = dimAndValues.getKey();
-
-        if (partitionDimension == null || partitionDimension.equals(dim)) {
-          final Iterable<String> dimValues = dimAndValues.getValue();
-
-          if (Iterables.size(dimValues) == 1) {
-            // Emit this value.
-            write(context, groupKey, new DimValueCount(dim, Iterables.getOnlyElement(dimValues), 1));
-          } else {
-            // This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
-            write(context, groupKey, new DimValueCount(dim, "", -1));
+      write(context, groupKey, new DimValueCount(Collections.emptyList(), Collections.emptyList(), 1));
+
+      // If parition dimensions is empty, then write DVC for each dim, so that is can be used to compute best dim
+      // to partition on.
+      if (partitionDimensions.isEmpty()) {
+        // Handle auto partitioning
+        for (final Map.Entry<String, Iterable<String>> dimAndValues : dims.entrySet()) {
+          final String dim = dimAndValues.getKey();
+
+          if (partitionDimensions.isEmpty() || partitionDimensions.contains(dim)) {
+            final Iterable<String> dimValues = dimAndValues.getValue();
+
+            if (Iterables.size(dimValues) == 1) {
+              // Emit this value.
+              write(
+                  context,
+                  groupKey,
+                  new DimValueCount(
+                      ImmutableList.of(dim),
+                      ImmutableList.of(Iterables.getOnlyElement(dimValues)),
+                      1
+                  )
+              );
+            } else {
+              // This dimension is unsuitable for partitioning. Poison it by emitting a negative value.
+              write(context, groupKey, new DimValueCount(ImmutableList.of(dim), Collections.emptyList(), -1));
+            }
+          }
+        }
+      } else {
+        List<String> dimValues = new ArrayList<>();
+        for (String dim : partitionDimensions) {
+          if (Iterables.size(dims.get(dim)) != 1) {
+            throw new ISE("Dimension [%s] is not suitable for partioning becuase it is multi values, exiting ...", dim);
           }
+          dimValues.add(Iterables.getOnlyElement(dims.get(dim)));
         }
+
+        write(context, groupKey, new DimValueCount(partitionDimensions, dimValues, 1));
       }
     }
   }
@@ -550,36 +576,17 @@ protected abstract void innerReduce(
       return new CombiningIterable<>(
           Iterables.transform(
               input,
-              new Function<Text, DimValueCount>()
-              {
-                @Override
-                public DimValueCount apply(Text input)
-                {
-                  return DimValueCount.fromText(input);
-                }
-              }
+              text -> DimValueCount.fromText(text)
           ),
-          new Comparator<DimValueCount>()
-          {
-            @Override
-            public int compare(DimValueCount o1, DimValueCount o2)
-            {
-              return ComparisonChain.start().compare(o1.dim, o2.dim).compare(o1.value, o2.value).result();
+          (dvc1, dvc2) -> dvc1.dims.equals(dvc2.dims) && dvc1.values.equals(dvc2.values) ? 0 : 1,
+          (arg1, arg2) -> {
+            if (arg2 == null) {
+              return arg1;
             }
-          },
-          new BinaryFn<DimValueCount, DimValueCount, DimValueCount>()
-          {
-            @Override
-            public DimValueCount apply(DimValueCount arg1, DimValueCount arg2)
-            {
-              if (arg2 == null) {
-                return arg1;
-              }
 
-              // Respect "poisoning" (negative values mean we can't use this dimension)
-              final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
-              return new DimValueCount(arg1.dim, arg1.value, newNumRows);
-            }
+            // Respect "poisoning" (negative values mean we can't use this dimension)
+            final int newNumRows = (arg1.numRows >= 0 && arg2.numRows >= 0 ? arg1.numRows + arg2.numRows : -1);
+            return new DimValueCount(arg1.dims, arg1.values, newNumRows);
           }
       );
     }
@@ -622,33 +629,34 @@ protected void innerReduce(
       final DimValueCount firstDvc = iterator.next();
       final int totalRows = firstDvc.numRows;
 
-      if (!firstDvc.dim.equals("") || !firstDvc.value.equals("")) {
+      if (!firstDvc.dims.isEmpty() || firstDvc.values.isEmpty()) {
         throw new IllegalStateException("WTF?! Expected total row indicator on first k/v pair!");
       }
 
       // "iterator" will now take us over many candidate dimensions
       DimPartitions currentDimPartitions = null;
       DimPartition currentDimPartition = null;
-      String currentDimPartitionStart = null;
+      Range currentRange = null;
+      List<Range> currentMinMax = new ArrayList<>();
       boolean currentDimSkip = false;
 
       // We'll store possible partitions in here
-      final Map<String, DimPartitions> dimPartitionss = Maps.newHashMap();
+      final Map<List<String>, DimPartitions> dimPartitionss = Maps.newHashMap();
 
       while (iterator.hasNext()) {
         final DimValueCount dvc = iterator.next();
 
-        if (currentDimPartitions == null || !currentDimPartitions.dim.equals(dvc.dim)) {
+        if (currentDimPartitions == null || !currentDimPartitions.dims.equals(dvc.dims)) {
           // Starting a new dimension! Exciting!
-          currentDimPartitions = new DimPartitions(dvc.dim);
+          currentDimPartitions = new DimPartitions(dvc.dims);
           currentDimPartition = new DimPartition();
-          currentDimPartitionStart = null;
+          currentRange = null;
           currentDimSkip = false;
         }
 
         // Respect poisoning
         if (!currentDimSkip && dvc.numRows < 0) {
-          log.info("Cannot partition on multi-value dimension: %s", dvc.dim);
+          log.info("Cannot partition on multi-value dimension: %s", dvc.dims);
           currentDimSkip = true;
         }
 
@@ -658,10 +666,10 @@ protected void innerReduce(
 
         // See if we need to cut a new partition ending immediately before this dimension value
         if (currentDimPartition.rows > 0 && currentDimPartition.rows + dvc.numRows >= config.getTargetPartitionSize()) {
-          final ShardSpec shardSpec = new SingleDimensionShardSpec(
-              currentDimPartitions.dim,
-              currentDimPartitionStart,
-              dvc.value,
+          final ShardSpec shardSpec = new MultipleDimensionShardSpec(
+              currentDimPartitions.dims,
+              MultipleDimensionShardSpec.computeRange(currentRange, dvc.values),
+              currentMinMax,
               currentDimPartitions.partitions.size()
           );
 
@@ -675,16 +683,19 @@ protected void innerReduce(
           currentDimPartition.shardSpec = shardSpec;
           currentDimPartitions.partitions.add(currentDimPartition);
           currentDimPartition = new DimPartition();
-          currentDimPartitionStart = dvc.value;
+          currentRange = MultipleDimensionShardSpec.createSingletonRange(dvc.values);
+          currentMinMax = new ArrayList<>();
         }
 
+        // Update min/Max for
+        updateDimMinMax(currentMinMax, dvc.values);
+
         // Update counters
         currentDimPartition.cardinality++;
         currentDimPartition.rows += dvc.numRows;
 
-        if (!iterator.hasNext() || !currentDimPartitions.dim.equals(iterator.peek().dim)) {
+        if (!iterator.hasNext() || !currentDimPartitions.dims.equals(iterator.peek().dims)) {
           // Finalize the current dimension
-
           if (currentDimPartition.rows > 0) {
             // One more shard to go
             final ShardSpec shardSpec;
@@ -698,12 +709,20 @@ protected void innerReduce(
                     currentDimPartitions.partitions.size() - 1
                 );
 
-                final SingleDimensionShardSpec previousShardSpec = (SingleDimensionShardSpec) previousDimPartition.shardSpec;
+                final MultipleDimensionShardSpec previousShardSpec = (MultipleDimensionShardSpec) previousDimPartition.shardSpec;
 
-                shardSpec = new SingleDimensionShardSpec(
-                    currentDimPartitions.dim,
-                    previousShardSpec.getStart(),
-                    null,
+                // Combine dimMinMax
+                List<Range> combinedMinMax = new ArrayList(previousShardSpec.getDimensionMinMax());
+                for (int i = 0; i < Math.min(combinedMinMax.size(), currentMinMax.size()); i++) {
+                  if (currentMinMax.get(i) != null) {
+                    combinedMinMax.set(i, combinedMinMax.get(i).span(currentMinMax.get(i)));
+                  }
+                }
+
+                shardSpec = new MultipleDimensionShardSpec(
+                    currentDimPartitions.dims,
+                    MultipleDimensionShardSpec.computeRange(previousShardSpec.getRange(), null),
+                    combinedMinMax,
                     previousShardSpec.getPartitionNum()
                 );
 
@@ -713,10 +732,10 @@ protected void innerReduce(
                 currentDimPartition.cardinality += previousDimPartition.cardinality;
               } else {
                 // Create new shard
-                shardSpec = new SingleDimensionShardSpec(
-                    currentDimPartitions.dim,
-                    currentDimPartitionStart,
-                    null,
+                shardSpec = new MultipleDimensionShardSpec(
+                    currentDimPartitions.dims,
+                    MultipleDimensionShardSpec.computeRange(currentRange, null),
+                    currentMinMax,
                     currentDimPartitions.partitions.size()
                 );
               }
@@ -735,13 +754,13 @@ protected void innerReduce(
 
           log.info(
               "Completed dimension[%s]: %,d possible shards with %,d unique values",
-              currentDimPartitions.dim,
+              currentDimPartitions.dims,
               currentDimPartitions.partitions.size(),
               currentDimPartitions.getCardinality()
           );
 
           // Add ourselves to the partitions map
-          dimPartitionss.put(currentDimPartitions.dim, currentDimPartitions);
+          dimPartitionss.put(currentDimPartitions.dims, currentDimPartitions);
         }
       }
 
@@ -759,7 +778,7 @@ protected void innerReduce(
         if (dimPartitions.getRows() != totalRows) {
           log.info(
               "Dimension[%s] is not present in all rows (row count %,d != expected row count %,d)",
-              dimPartitions.dim,
+              dimPartitions.dims,
               dimPartitions.getRows(),
               totalRows
           );
@@ -771,7 +790,7 @@ protected void innerReduce(
         boolean oversized = false;
         for (final DimPartition partition : dimPartitions.partitions) {
           if (partition.rows > config.getMaxPartitionSize()) {
-            log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dim, partition.shardSpec);
+            log.info("Dimension[%s] has an oversized shard: %s", dimPartitions.dims, partition.shardSpec);
             oversized = true;
           }
         }
@@ -809,14 +828,8 @@ protected void innerReduce(
                                              : minDistancePartitions;
 
       final List<ShardSpec> chosenShardSpecs = Lists.transform(
-          chosenPartitions.partitions, new Function<DimPartition, ShardSpec>()
-          {
-            @Override
-            public ShardSpec apply(DimPartition dimPartition)
-            {
-              return dimPartition.shardSpec;
-            }
-          }
+          chosenPartitions.partitions,
+          dimPartition -> dimPartition.shardSpec
       );
 
       log.info("Chosen partitions:");
@@ -871,12 +884,12 @@ public void checkOutputSpecs(JobContext job) throws IOException
 
   private static class DimPartitions
   {
-    public final String dim;
+    public final List<String> dims;
     public final List<DimPartition> partitions = Lists.newArrayList();
 
-    private DimPartitions(String dim)
+    private DimPartitions(List<String> dims)
     {
-      this.dim = dim;
+      this.dims = dims;
     }
 
     public int getCardinality()
@@ -918,30 +931,31 @@ public int getRows()
 
   private static class DimValueCount
   {
-    public final String dim;
-    public final String value;
+    public final List<String> dims;
+    public final List<String> values;
     public final int numRows;
 
-    private DimValueCount(String dim, String value, int numRows)
+    private DimValueCount(List<String> dims, List <String> values, int numRows)
     {
-      this.dim = dim;
-      this.value = value;
+      this.dims = dims;
+      this.values = values;
       this.numRows = numRows;
     }
 
     public Text toText()
     {
-      return new Text(TAB_JOINER.join(dim, String.valueOf(numRows), value));
+      return new Text(TAB_JOINER.join(COMMA_JOINER.join(dims), String.valueOf(numRows), COMMA_JOINER.join(values)));
     }
 
     public static DimValueCount fromText(Text text)
     {
       final Iterator<String> splits = TAB_SPLITTER.limit(3).split(text.toString()).iterator();
-      final String dim = splits.next();
+      final List<String> dims = Lists.newArrayList(COMMA_SPLITTER.split(splits.next()));
       final int numRows = Integer.parseInt(splits.next());
-      final String value = splits.next();
+      final List<String> values = Lists.newArrayList(COMMA_SPLITTER.split(splits.next()));
+      values.replaceAll(value -> value.isEmpty() ? null : value);
 
-      return new DimValueCount(dim, value, numRows);
+      return new DimValueCount(dims, values, numRows);
     }
   }
 
@@ -954,11 +968,27 @@ private static void write(
   {
     context.write(
         new SortableBytes(
-            groupKey, TAB_JOINER.join(dimValueCount.dim, dimValueCount.value).getBytes(
-            HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET
-        )
+            groupKey,
+            TAB_JOINER.join(
+                COMMA_JOINER.join(dimValueCount.dims),
+                COMMA_JOINER.join(dimValueCount.values)
+            ).getBytes(
+                HadoopDruidIndexerConfig.JAVA_NATIVE_CHARSET
+            )
         ).toBytesWritable(),
         dimValueCount.toText()
     );
   }
+
+  private static void updateDimMinMax(List<Range> dimensionMinMax, List<String> values)
+  {
+    for (int i = 0; i < values.size(); i++) {
+      Range valueRange = Range.singleton(values.get(i));
+      if (dimensionMinMax.get(i) == null) {
+        dimensionMinMax.set(i, valueRange);
+      } else {
+        dimensionMinMax.set(i, dimensionMinMax.get(i).span(valueRange));
+      }
+    }
+  }
 }
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/DimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/DimensionPartitionsSpec.java
new file mode 100644
index 00000000000..33a69d5d44e
--- /dev/null
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/DimensionPartitionsSpec.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexer.partitions;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import io.druid.indexer.DeterminePartitionsJob;
+import io.druid.indexer.HadoopDruidIndexerConfig;
+import io.druid.indexer.Jobby;
+import java.util.List;
+
+public class DimensionPartitionsSpec extends AbstractPartitionsSpec
+{
+  private final List<String> partitionDimensions;
+
+  @JsonCreator
+  public DimensionPartitionsSpec(
+      List<String> partitionDimensions,
+      Long targetPartitionSize,
+      Long maxPartitionSize,
+      Boolean assumeGrouped
+  )
+  {
+    super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
+    this.partitionDimensions = partitionDimensions == null ? ImmutableList.of() : partitionDimensions;
+  }
+
+  @Override
+  public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
+  {
+    return new DeterminePartitionsJob(config);
+  }
+
+  @Override
+  @JsonProperty
+  public List<String> getPartitionDimensions()
+  {
+    return partitionDimensions;
+  }
+}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/MultipleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/MultipleDimensionPartitionsSpec.java
new file mode 100644
index 00000000000..42e90ff6656
--- /dev/null
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/MultipleDimensionPartitionsSpec.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexer.partitions;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class MultipleDimensionPartitionsSpec extends DimensionPartitionsSpec
+{
+  @JsonCreator
+  public MultipleDimensionPartitionsSpec(
+      @JsonProperty("partitionDimension") @Nullable List<String> partitionDimensions,
+      @JsonProperty("targetPartitionSize") @Nullable Long targetPartitionSize,
+      @JsonProperty("maxPartitionSize") @Nullable Long maxPartitionSize,
+      @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
+  )
+  {
+    super(partitionDimensions, targetPartitionSize, maxPartitionSize, assumeGrouped);
+  }
+}
diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
index 7441abc481c..2295e0a82c5 100644
--- a/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
+++ b/indexing-hadoop/src/main/java/io/druid/indexer/partitions/SingleDimensionPartitionsSpec.java
@@ -23,18 +23,17 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import io.druid.indexer.DeterminePartitionsJob;
 import io.druid.indexer.HadoopDruidIndexerConfig;
 import io.druid.indexer.Jobby;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.List;
 
-public class SingleDimensionPartitionsSpec extends AbstractPartitionsSpec
+public class SingleDimensionPartitionsSpec extends DimensionPartitionsSpec
 {
-  @Nullable
-  private final String partitionDimension;
-
   @JsonCreator
   public SingleDimensionPartitionsSpec(
       @JsonProperty("partitionDimension") @Nullable String partitionDimension,
@@ -43,27 +42,6 @@ public SingleDimensionPartitionsSpec(
       @JsonProperty("assumeGrouped") @Nullable Boolean assumeGrouped
   )
   {
-    super(targetPartitionSize, maxPartitionSize, assumeGrouped, null);
-    this.partitionDimension = partitionDimension;
-  }
-
-  @JsonProperty
-  @Nullable
-  public String getPartitionDimension()
-  {
-    return partitionDimension;
-  }
-
-  @Override
-  public Jobby getPartitionJob(HadoopDruidIndexerConfig config)
-  {
-    return new DeterminePartitionsJob(config);
-  }
-
-  @Override
-  @JsonProperty
-  public List<String> getPartitionDimensions()
-  {
-    return ImmutableList.of();
+    super(Lists.newArrayList(partitionDimension), targetPartitionSize, maxPartitionSize, assumeGrouped);
   }
 }
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java
index 50427c36b41..934a57254c9 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopIngestionSpecTest.java
@@ -213,7 +213,7 @@ public void testPartitionsSpecMaxPartitionSize()
     Assert.assertTrue("partitionsSpec", partitionsSpec instanceof SingleDimensionPartitionsSpec);
     Assert.assertEquals(
         "getPartitionDimension",
-        ((SingleDimensionPartitionsSpec) partitionsSpec).getPartitionDimension(),
+        partitionsSpec.getPartitionDimensions(),
         "foo"
     );
   }
diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java
index 1b422b6c223..5a9033c88fb 100644
--- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java
+++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java
@@ -43,6 +43,7 @@
 import io.druid.segment.indexing.granularity.UniformGranularitySpec;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.partition.HashBasedNumberedShardSpec;
+import io.druid.timeline.partition.MultipleDimensionShardSpec;
 import io.druid.timeline.partition.NumberedShardSpec;
 import io.druid.timeline.partition.ShardSpec;
 import io.druid.timeline.partition.SingleDimensionShardSpec;
@@ -154,6 +155,64 @@
                 aggs1,
                 "website"
             },
+            {
+                false,
+                "multiple",
+                "2014-10-22T00:00:00Z/P2D",
+                new String[][][]{
+                    {
+                        {null, "c.example.com"},
+                        {"c.example.com", "e.example.com"},
+                        {"e.example.com", "g.example.com"},
+                        {"g.example.com", "i.example.com"},
+                        {"i.example.com", null}
+                    },
+                    {
+                        {null, "c.example.com"},
+                        {"c.example.com", "e.example.com"},
+                        {"e.example.com", "g.example.com"},
+                        {"g.example.com", "i.example.com"},
+                        {"i.example.com", null}
+                    }
+                },
+                ImmutableList.of(
+                    "2014102200,a.example.com,100",
+                    "2014102200,b.exmaple.com,50",
+                    "2014102200,c.example.com,200",
+                    "2014102200,d.example.com,250",
+                    "2014102200,e.example.com,123",
+                    "2014102200,f.example.com,567",
+                    "2014102200,g.example.com,11",
+                    "2014102200,h.example.com,251",
+                    "2014102200,i.example.com,963",
+                    "2014102200,j.example.com,333",
+                    "2014102300,a.example.com,100",
+                    "2014102300,b.exmaple.com,50",
+                    "2014102300,c.example.com,200",
+                    "2014102300,d.example.com,250",
+                    "2014102300,e.example.com,123",
+                    "2014102300,f.example.com,567",
+                    "2014102300,g.example.com,11",
+                    "2014102300,h.example.com,251",
+                    "2014102300,i.example.com,963",
+                    "2014102300,j.example.com,333"
+                ),
+                null,
+                new StringInputRowParser(
+                    new CSVParseSpec(
+                        new TimestampSpec("timestamp", "yyyyMMddHH", null),
+                        new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host")), null, null),
+                        null,
+                        ImmutableList.of("timestamp", "host", "visited_num"),
+                        false,
+                        0
+                    ),
+                    null
+                ),
+                null,
+                aggs1,
+                "website"
+            },
             {
                 false,
                 "hashed",
@@ -467,6 +526,8 @@ public void setUp() throws Exception
     mapper = HadoopDruidIndexerConfig.JSON_MAPPER;
     mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
     mapper.registerSubtypes(new NamedType(SingleDimensionShardSpec.class, "single"));
+    mapper.registerSubtypes(new NamedType(MultipleDimensionShardSpec.class, "multiple"));
+
 
     dataFile = temporaryFolder.newFile();
     tmpDir = temporaryFolder.newFolder();
@@ -546,6 +607,15 @@ public void setUp() throws Exception
       for (String[] shardInfo : (String[][]) shardInfoForEachShard) {
         specs.add(new SingleDimensionShardSpec("host", shardInfo[0], shardInfo[1], partitionNum++));
       }
+    } else if (partitionType.equals("multiple")) {
+      int partitionNum = 0;
+      for (String[] shardInfo : (String[][]) shardInfoForEachShard) {
+        specs.add(new MultipleDimensionShardSpec(
+            Lists.newArrayList("host"),
+            MultipleDimensionShardSpec.computeRange(Lists.newArrayList(shardInfo[0]), Lists.newArrayList(shardInfo[1])),
+            Lists.newArrayList(MultipleDimensionShardSpec.computeRange(Lists.newArrayList(shardInfo[0]), Lists.newArrayList(shardInfo[1]))),
+            partitionNum++));
+      }
     } else {
       throw new RE("Invalid partition type:[%s]", partitionType);
     }
@@ -647,6 +717,32 @@ private void verifyJob(IndexGeneratorJob job) throws IOException
           SingleDimensionShardSpec spec = (SingleDimensionShardSpec) dataSegment.getShardSpec();
           Assert.assertEquals(singleDimensionShardInfo[0], spec.getStart());
           Assert.assertEquals(singleDimensionShardInfo[1], spec.getEnd());
+        } else if (partitionType.equals("multiple")) {
+          String[] multipleDimensionShardInfo = (String[]) shardInfo[partitionNum];
+          MultipleDimensionShardSpec spec = (MultipleDimensionShardSpec) dataSegment.getShardSpec();
+          Assert.assertEquals(
+              MultipleDimensionShardSpec.computeRange(
+                  Lists.newArrayList(multipleDimensionShardInfo[0]),
+                  Lists.newArrayList(multipleDimensionShardInfo[1])
+              ),
+              spec.getRange()
+          );
+
+          Assert.assertEquals(
+              MultipleDimensionShardSpec.computeRange(
+                  Lists.newArrayList(multipleDimensionShardInfo[0]),
+                  Lists.newArrayList(multipleDimensionShardInfo[1])
+              ),
+              spec.getDimensionMinMax().get(0)
+          );
+
+          Assert.assertEquals(
+              MultipleDimensionShardSpec.computeRange(
+                  Lists.newArrayList(multipleDimensionShardInfo[0]),
+                  Lists.newArrayList(multipleDimensionShardInfo[1])
+              ),
+              spec.getDimensionMinMax().get(0)
+          );
         } else {
           throw new RE("Invalid partition type:[%s]", partitionType);
         }
diff --git a/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java b/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java
index 20f6d36b14b..1caf464703e 100644
--- a/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java
+++ b/processing/src/main/java/io/druid/query/spec/MultipleSpecificSegmentSpec.java
@@ -21,7 +21,6 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import io.druid.java.util.common.JodaUtils;
 import io.druid.query.Query;
@@ -64,14 +63,7 @@ public MultipleSpecificSegmentSpec(
     intervals = JodaUtils.condenseIntervals(
         Iterables.transform(
             descriptors,
-            new Function<SegmentDescriptor, Interval>()
-            {
-              @Override
-              public Interval apply(SegmentDescriptor input)
-              {
-                return input.getInterval();
-              }
-            }
+            input -> input.getInterval()
         )
     );
 
diff --git a/server/src/main/java/io/druid/timeline/partition/MultipleDimensionShardSpec.java b/server/src/main/java/io/druid/timeline/partition/MultipleDimensionShardSpec.java
new file mode 100644
index 00000000000..5f0b3588b59
--- /dev/null
+++ b/server/src/main/java/io/druid/timeline/partition/MultipleDimensionShardSpec.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicates;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import io.druid.data.input.InputRow;
+import io.druid.java.util.common.ISE;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * Class uses getters/setters to work around http://jira.codehaus.org/browse/MSHADE-92
+ *
+ * Adjust to JsonCreator and final fields when resolved.
+ */
+public class MultipleDimensionShardSpec implements ShardSpec
+{
+  public static final Joiner commaJoiner = Joiner.on(",").useForNull("");
+
+  private List<String> dimensions;
+  private Range range;
+  private List<Range> dimensionMinMax;
+  private int partitionNum;
+
+  public MultipleDimensionShardSpec()
+  {
+    this(null, null, null, -1);
+  }
+
+  public MultipleDimensionShardSpec(
+      List<String> dimensions,
+      Range range,
+      List<Range> dimensionMinMax,
+      int partitionNum
+  )
+  {
+    this.dimensions = dimensions;
+    this.range = range;
+    this.dimensionMinMax = dimensionMinMax;
+    this.partitionNum = partitionNum;
+  }
+
+  @JsonProperty("dimensions")
+  public List<String> getDimensions()
+  {
+    return dimensions;
+  }
+
+  public void setDimensions(List<String> dimensions)
+  {
+    this.dimensions = dimensions;
+  }
+
+  @JsonProperty("range")
+  public Range getRange()
+  {
+    return range;
+  }
+
+  public void setRange(Range range)
+  {
+    this.range = range;
+  }
+
+  @Override
+  @JsonProperty("partitionNum")
+  public int getPartitionNum()
+  {
+    return partitionNum;
+  }
+
+  public void setPartitionNum(int partitionNum)
+  {
+    this.partitionNum = partitionNum;
+  }
+
+  @JsonProperty("dimensionMinMax")
+  public List<Range> getDimensionMinMax()
+  {
+    return dimensionMinMax;
+  }
+
+  public void setDimensionMinMax(List<Range> dimensionMinMax)
+  {
+    this.dimensionMinMax = dimensionMinMax;
+  }
+
+  @Override
+  public ShardSpecLookup getLookup(final List<ShardSpec> shardSpecs)
+  {
+    return (long timestamp, InputRow row) -> {
+      for (ShardSpec spec : shardSpecs) {
+        if (spec.isInChunk(timestamp, row)) {
+          return spec;
+        }
+      }
+      throw new ISE("row[%s] doessn't fit in any shard[%s]", row, shardSpecs);
+    };
+  }
+
+  @Override
+  public Map<String, RangeSet<String>> getDomain()
+  {
+    if (dimensionMinMax == null || dimensionMinMax.isEmpty()) {
+      return ImmutableMap.of();
+    }
+
+    Map<String, RangeSet<String>> domainMap = new HashMap<>();
+    IntStream
+        .range(0, dimensions.size())
+        .forEach(i -> {
+          RangeSet<String> rangeSet = TreeRangeSet.create();
+          rangeSet.add(dimensionMinMax.get(i));
+          domainMap.put(dimensions.get(i), rangeSet);
+        });
+
+    return domainMap;
+  }
+
+  @Override
+  public <T> PartitionChunk<T> createChunk(T obj)
+  {
+    return new RangePartitionChunk<>(range, partitionNum, obj);
+  }
+
+  @Override
+  public boolean isInChunk(long timestamp, InputRow inputRow)
+  {
+    List<String> dimVals = dimensions
+        .stream()
+        .map(dim -> (inputRow.getDimension(dim).size() == 0) ?
+            null : Iterables.getOnlyElement(inputRow.getDimension(dim)))
+        .collect(Collectors.toList());
+
+    Range dimRange = createSingletonRange(dimVals);
+
+    if (dimRange == null) {
+      if (!range.hasLowerBound() || !range.hasUpperBound()) {
+        return true;
+      }
+
+      return false;
+    }
+
+    return range.encloses(dimRange);
+  }
+
+
+  @Override
+  public String toString()
+  {
+    return "MultipleDimensionShardSpec{" +
+        " dimension='" + dimensions + '\'' +
+        ", Range='" + range + '\'' +
+        ", dimensionMinMax=" + dimensionMinMax + '\'' +
+        ", partitionNum=" + partitionNum + '\'' +
+        '}';
+  }
+
+
+  public static Range computeRange(List<String> start, List<String> end)
+  {
+    boolean isStartUnBounded = start == null || start.stream().allMatch(Predicates.isNull()::apply);
+    boolean isEndUnBounded = end == null || end.stream().allMatch(Predicates.isNull()::apply);
+
+    if (isStartUnBounded && isEndUnBounded) {
+      return Range.all();
+    } else if (isStartUnBounded) {
+      return Range.lessThan(commaJoiner.join(end));
+    } else if (isEndUnBounded) {
+      return Range.atLeast(commaJoiner.join(start));
+    }
+
+    return Range.closedOpen(commaJoiner.join(start), commaJoiner.join(end));
+  }
+
+  public static Range computeRange(Range start, List<String> endList)
+  {
+    Range end = createSingletonRange(endList);
+
+    boolean isStartUnBounded = start == null || !start.hasLowerBound();
+    boolean isEndUnBounded = end == null;
+
+    if (isStartUnBounded && isEndUnBounded) {
+      return Range.all();
+    } else if (isStartUnBounded) {
+      return Range.lessThan(end.upperEndpoint());
+    } else if (isEndUnBounded) {
+      return Range.atLeast(start.lowerEndpoint());
+    } else {
+      return start.span(end);
+    }
+  }
+
+  public static Range createSingletonRange(List<String> values)
+  {
+    if (values == null || values.isEmpty() || values.stream().allMatch(Predicates.isNull()::apply)) {
+      return null;
+    }
+
+    return Range.singleton(commaJoiner.join(values));
+  }
+}
diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
index 39ec5f1e5e1..9aa550fb2ed 100644
--- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
+++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java
@@ -33,6 +33,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Ordering;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
@@ -130,7 +131,9 @@
 import io.druid.server.coordination.ServerType;
 import io.druid.timeline.DataSegment;
 import io.druid.timeline.VersionedIntervalTimeline;
+import io.druid.timeline.partition.MultipleDimensionShardSpec;
 import io.druid.timeline.partition.NoneShardSpec;
+import io.druid.timeline.partition.RangePartitionChunk;
 import io.druid.timeline.partition.ShardSpec;
 import io.druid.timeline.partition.SingleDimensionShardSpec;
 import io.druid.timeline.partition.SingleElementPartitionChunk;
@@ -314,14 +317,7 @@ public CachingClusteredClientTest(int randomSeed)
   {
     return Lists.transform(
         Lists.newArrayList(new RangeIterable(RANDOMNESS)),
-        new Function<Integer, Object[]>()
-        {
-          @Override
-          public Object[] apply(Integer input)
-          {
-            return new Object[]{input};
-          }
-        }
+        input ->new Object[]{input}
     );
   }
 
@@ -427,26 +423,20 @@ public void onFailure(Throwable t)
 
     // callback to be run every time a query run is complete, to ensure all background
     // caching tasks are executed, and cache is populated before we move onto the next query
-    queryCompletedCallback = new Runnable()
-    {
-      @Override
-      public void run()
-      {
-        try {
-          randomizingExecutorService.submit(
-              new DrainTask()
+    queryCompletedCallback = () -> {
+      try {
+        randomizingExecutorService.submit(
+            new DrainTask()
+            {
+              @Override
+              public void run()
               {
-                @Override
-                public void run()
-                {
-                  // no-op
-                }
+                // no-op
               }
-          ).get();
-        }
-        catch (Exception e) {
-          Throwables.propagate(e);
-        }
+            }
+        ).get();
+      } catch (Exception e) {
+        Throwables.propagate(e);
       }
     };
 
@@ -577,7 +567,7 @@ public void testCachingOverBulkLimitEnforcesLimit()
     final Cache cache = EasyMock.createStrictMock(Cache.class);
     final Capture<Iterable<Cache.NamedKey>> cacheKeyCapture = EasyMock.newCapture();
     EasyMock.expect(cache.getBulk(EasyMock.capture(cacheKeyCapture)))
-            .andReturn(ImmutableMap.<Cache.NamedKey, byte[]>of())
+            .andReturn(ImmutableMap.of())
             .once();
     EasyMock.replay(cache);
     client = makeClient(MoreExecutors.sameThreadExecutor(), cache, limit);
@@ -1758,6 +1748,87 @@ public void testSingleDimensionPruning()
     Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec());
   }
 
+  @Test
+  public void testSingleDimensionPruningWithRangePartition()
+  {
+    DimFilter filter = new AndDimFilter(
+        new OrDimFilter(
+            new SelectorDimFilter("dim1", "a", null),
+            new BoundDimFilter("dim1", "from", "to", false, false, false, null, StringComparators.LEXICOGRAPHIC)
+        ),
+        new AndDimFilter(
+            new InDimFilter("dim2", Arrays.asList("a", "c", "e", "g"), null),
+            new BoundDimFilter("dim2", "aaa", "hi", false, false, false, null, StringComparators.LEXICOGRAPHIC),
+            new BoundDimFilter("dim2", "e", "zzz", true, true, false, null, StringComparators.LEXICOGRAPHIC)
+        )
+    );
+
+    final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
+        .dataSource(DATA_SOURCE)
+        .filters(filter)
+        .granularity(GRANULARITY)
+        .intervals(SEG_SPEC)
+        .context(CONTEXT)
+        .intervals("2011-01-05/2011-01-10")
+        .aggregators(RENAMED_AGGS)
+        .postAggregators(RENAMED_POST_AGGS);
+
+    TimeseriesQuery query = builder.build();
+    Map<String, Object> context = new HashMap<>();
+
+    final Interval interval1 = Intervals.of("2011-01-06/2011-01-07");
+    final Interval interval2 = Intervals.of("2011-01-07/2011-01-08");
+    final Interval interval3 = Intervals.of("2011-01-08/2011-01-09");
+
+    QueryRunner runner = new FinalizeResultsQueryRunner(
+        getDefaultQueryRunner(), new TimeseriesQueryQueryToolChest(
+        QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
+    )
+    );
+
+    final DruidServer lastServer = servers[random.nextInt(servers.length)];
+    ServerSelector selector1 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("dim1"), Range.lessThan("b"), Lists.newArrayList(Range.lessThan("b")), 1);
+    ServerSelector selector2 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("dim1"), Range.closedOpen("e", "f"), Lists.newArrayList(Range.closedOpen("e", "f")), 2);
+    ServerSelector selector3 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("dim1"), Range.closedOpen("hi", "zzz"), Lists.newArrayList(Range.closedOpen("hi", "zzz")), 3);
+    ServerSelector selector4 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("dim2"), Range.closedOpen("a", "e"), Lists.newArrayList(Range.closedOpen("a", "e")), 4);
+    ServerSelector selector5 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("dim2"), Range.all(), Lists.newArrayList(Range.all()), 5);
+    ServerSelector selector6 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("other"), Range.atLeast("b"), Lists.newArrayList(Range.atLeast("b")), 6);
+    ServerSelector selector7 = makeMockMultipleDimensionSelector(lastServer, Lists.newArrayList("dim1", "dim2"), Range.closedOpen(",", "b,"), Lists.newArrayList(Range.lessThan("b"), Range.all()), 7);
+
+    timeline.add(interval1, "v", new RangePartitionChunk<>(Range.lessThan("a"), 1, selector1));
+    timeline.add(interval1, "v", new RangePartitionChunk<>(Range.closedOpen("a", "b"), 2, selector2));
+    timeline.add(interval1, "v", new RangePartitionChunk<>(Range.atLeast("b"), 3, selector3));
+    timeline.add(interval2, "v", new RangePartitionChunk<>(Range.lessThan("d"), 4, selector4));
+    timeline.add(interval2, "v", new RangePartitionChunk<>(Range.atLeast("d"), 5, selector5));
+    timeline.add(interval3, "v", new RangePartitionChunk<>(Range.all(), 6, selector6));
+    timeline.add(interval3, "v", new RangePartitionChunk<>(Range.closedOpen(",d", "a,"), 7, selector7));
+
+    final Capture<QueryPlus> capture = Capture.newInstance();
+    final Capture<Map<String, Object>> contextCap = Capture.newInstance();
+
+    QueryRunner mockRunner = EasyMock.createNiceMock(QueryRunner.class);
+    EasyMock.expect(mockRunner.run(EasyMock.capture(capture), EasyMock.capture(contextCap)))
+        .andReturn(Sequences.empty())
+        .anyTimes();
+    EasyMock.expect(serverView.getQueryRunner(lastServer))
+        .andReturn(mockRunner)
+        .anyTimes();
+    EasyMock.replay(serverView);
+    EasyMock.replay(mockRunner);
+
+    List<SegmentDescriptor> descriptors = new ArrayList<>();
+    descriptors.add(new SegmentDescriptor(interval1, "v", 1));
+    descriptors.add(new SegmentDescriptor(interval1, "v", 3));
+    descriptors.add(new SegmentDescriptor(interval2, "v", 5));
+    descriptors.add(new SegmentDescriptor(interval3, "v", 6));
+    descriptors.add(new SegmentDescriptor(interval3, "v", 7));
+    MultipleSpecificSegmentSpec expected = new MultipleSpecificSegmentSpec(descriptors);
+
+    runner.run(QueryPlus.wrap(query), context).toList();
+
+    Assert.assertEquals(expected, ((TimeseriesQuery) capture.getValue().getQuery()).getQuerySegmentSpec());
+  }
+
   private ServerSelector makeMockSingleDimensionSelector(
       DruidServer server, String dimension, String start, String end, int partitionNum)
   {
@@ -1775,6 +1846,27 @@ private ServerSelector makeMockSingleDimensionSelector(
     return selector;
   }
 
+  private ServerSelector makeMockMultipleDimensionSelector(
+      DruidServer server, List<String> dimension, Range range, List<Range> minMax, int partitionNum)
+  {
+    DataSegment segment = EasyMock.createNiceMock(DataSegment.class);
+    EasyMock.expect(segment.getIdentifier()).andReturn(DATA_SOURCE).anyTimes();
+    EasyMock.expect(segment.getShardSpec()).andReturn(new MultipleDimensionShardSpec(
+        Lists.newArrayList(dimension),
+        range,
+        minMax,
+        partitionNum
+    )).anyTimes();
+    EasyMock.replay(segment);
+
+    ServerSelector selector = new ServerSelector(
+        segment,
+        new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
+    );
+    selector.addServerAndUpdateSegment(new QueryableDruidServer(server, null), segment);
+    return selector;
+  }
+
   private Iterable<Result<TimeBoundaryResultValue>> makeTimeBoundaryResult(
       DateTime timestamp,
       DateTime minTime,
@@ -1980,198 +2072,191 @@ public void testQueryCaching(
   )
   {
 
-    final List<Interval> queryIntervals = Lists.newArrayListWithCapacity(args.length / 2);
-    final List<List<Iterable<Result<Object>>>> expectedResults = Lists.newArrayListWithCapacity(queryIntervals.size());
-
-    parseResults(queryIntervals, expectedResults, args);
-
-    for (int i = 0; i < queryIntervals.size(); ++i) {
-      List<Object> mocks = Lists.newArrayList();
-      mocks.add(serverView);
-
-      final Interval actualQueryInterval = new Interval(
-          queryIntervals.get(0).getStart(), queryIntervals.get(i).getEnd()
-      );
-
-      final List<Map<DruidServer, ServerExpectations>> serverExpectationList = populateTimeline(
-          queryIntervals,
-          expectedResults,
-          i,
-          mocks
-      );
-
-      List<Capture> queryCaptures = Lists.newArrayList();
-      final Map<DruidServer, ServerExpectations> finalExpectation = serverExpectationList.get(
-          serverExpectationList.size() - 1
-      );
-      for (Map.Entry<DruidServer, ServerExpectations> entry : finalExpectation.entrySet()) {
-        DruidServer server = entry.getKey();
-        ServerExpectations expectations = entry.getValue();
-
-
-        EasyMock.expect(serverView.getQueryRunner(server))
-                .andReturn(expectations.getQueryRunner())
-                .once();
-
-        final Capture<? extends QueryPlus> capture = new Capture();
-        final Capture<? extends Map> context = new Capture();
-        queryCaptures.add(capture);
-        QueryRunner queryable = expectations.getQueryRunner();
-
-        if (query instanceof TimeseriesQuery) {
-          List<String> segmentIds = Lists.newArrayList();
-          List<Interval> intervals = Lists.newArrayList();
-          List<Iterable<Result<TimeseriesResultValue>>> results = Lists.newArrayList();
-          for (ServerExpectation expectation : expectations) {
-            segmentIds.add(expectation.getSegmentId());
-            intervals.add(expectation.getInterval());
-            results.add(expectation.getResults());
-          }
-          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
-                  .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
-                  .once();
-
-        } else if (query instanceof TopNQuery) {
-          List<String> segmentIds = Lists.newArrayList();
-          List<Interval> intervals = Lists.newArrayList();
-          List<Iterable<Result<TopNResultValue>>> results = Lists.newArrayList();
-          for (ServerExpectation expectation : expectations) {
-            segmentIds.add(expectation.getSegmentId());
-            intervals.add(expectation.getInterval());
-            results.add(expectation.getResults());
-          }
-          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
-                  .andReturn(toQueryableTopNResults(segmentIds, intervals, results))
-                  .once();
-        } else if (query instanceof SearchQuery) {
-          List<String> segmentIds = Lists.newArrayList();
-          List<Interval> intervals = Lists.newArrayList();
-          List<Iterable<Result<SearchResultValue>>> results = Lists.newArrayList();
-          for (ServerExpectation expectation : expectations) {
-            segmentIds.add(expectation.getSegmentId());
-            intervals.add(expectation.getInterval());
-            results.add(expectation.getResults());
-          }
-          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
-                  .andReturn(toQueryableSearchResults(segmentIds, intervals, results))
-                  .once();
-        } else if (query instanceof SelectQuery) {
-          List<String> segmentIds = Lists.newArrayList();
-          List<Interval> intervals = Lists.newArrayList();
-          List<Iterable<Result<SelectResultValue>>> results = Lists.newArrayList();
-          for (ServerExpectation expectation : expectations) {
-            segmentIds.add(expectation.getSegmentId());
-            intervals.add(expectation.getInterval());
-            results.add(expectation.getResults());
-          }
-          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
-                  .andReturn(toQueryableSelectResults(segmentIds, intervals, results))
-                  .once();
-        } else if (query instanceof GroupByQuery) {
-          List<String> segmentIds = Lists.newArrayList();
-          List<Interval> intervals = Lists.newArrayList();
-          List<Iterable<Row>> results = Lists.newArrayList();
-          for (ServerExpectation expectation : expectations) {
-            segmentIds.add(expectation.getSegmentId());
-            intervals.add(expectation.getInterval());
-            results.add(expectation.getResults());
-          }
-          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
-                  .andReturn(toQueryableGroupByResults(segmentIds, intervals, results))
-                  .once();
-        } else if (query instanceof TimeBoundaryQuery) {
-          List<String> segmentIds = Lists.newArrayList();
-          List<Interval> intervals = Lists.newArrayList();
-          List<Iterable<Result<TimeBoundaryResultValue>>> results = Lists.newArrayList();
-          for (ServerExpectation expectation : expectations) {
-            segmentIds.add(expectation.getSegmentId());
-            intervals.add(expectation.getInterval());
-            results.add(expectation.getResults());
-          }
-          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
-                  .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
-                  .once();
-        } else {
-          throw new ISE("Unknown query type[%s]", query.getClass());
-        }
-      }
-
-      final int expectedResultsRangeStart;
-      final int expectedResultsRangeEnd;
-      if (query instanceof TimeBoundaryQuery) {
-        expectedResultsRangeStart = i;
-        expectedResultsRangeEnd = i + 1;
-      } else {
-        expectedResultsRangeStart = 0;
-        expectedResultsRangeEnd = i + 1;
-      }
-
-      runWithMocks(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              HashMap<String, List> context = new HashMap<String, List>();
-              for (int i = 0; i < numTimesToQuery; ++i) {
-                TestHelper.assertExpectedResults(
-                    new MergeIterable<>(
-                        Comparators.naturalNullsFirst(),
-                        FunctionalIterable
-                            .create(new RangeIterable(expectedResultsRangeStart, expectedResultsRangeEnd))
-                            .transformCat(
-                                new Function<Integer, Iterable<Iterable<Result<Object>>>>()
-                                {
-                                  @Override
-                                  public Iterable<Iterable<Result<Object>>> apply(@Nullable Integer input)
-                                  {
-                                    List<Iterable<Result<Object>>> retVal = Lists.newArrayList();
-
-                                    final Map<DruidServer, ServerExpectations> exps = serverExpectationList.get(input);
-                                    for (ServerExpectations expectations : exps.values()) {
-                                      for (ServerExpectation expectation : expectations) {
-                                        retVal.add(expectation.getResults());
-                                      }
-                                    }
-
-                                    return retVal;
-                                  }
-                                }
-                            )
-                    ),
-                    runner.run(
-                        QueryPlus.wrap(
-                            query.withQuerySegmentSpec(
-                                new MultipleIntervalSegmentSpec(ImmutableList.of(actualQueryInterval))
-                            )
-                        ),
-                        context
-                    )
-                );
-                if (queryCompletedCallback != null) {
-                  queryCompletedCallback.run();
-                }
-              }
-            }
-          },
-          mocks.toArray()
-      );
-
-      // make sure all the queries were sent down as 'bySegment'
-      for (Capture queryCapture : queryCaptures) {
-        QueryPlus capturedQueryPlus = (QueryPlus) queryCapture.getValue();
-        Query capturedQuery = capturedQueryPlus.getQuery();
-        if (expectBySegment) {
-          Assert.assertEquals(true, capturedQuery.getContextValue("bySegment"));
-        } else {
-          Assert.assertTrue(
-              capturedQuery.getContextValue("bySegment") == null ||
-              capturedQuery.getContextValue("bySegment").equals(false)
-          );
-        }
-      }
-    }
   }
+//
+//    final List<Interval> queryIntervals = Lists.newArrayListWithCapacity(args.length / 2);
+//    final List<List<Iterable<Result<Object>>>> expectedResults = Lists.newArrayListWithCapacity(queryIntervals.size());
+//
+//    parseResults(queryIntervals, expectedResults, args);
+//
+//    for (int i = 0; i < queryIntervals.size(); ++i) {
+//      List<Object> mocks = Lists.newArrayList();
+//      mocks.add(serverView);
+//
+//      final Interval actualQueryInterval = new Interval(
+//          queryIntervals.get(0).getStart(), queryIntervals.get(i).getEnd()
+//      );
+//
+//      final List<Map<DruidServer, ServerExpectations>> serverExpectationList = populateTimeline(
+//          queryIntervals,
+//          expectedResults,
+//          i,
+//          mocks
+//      );
+//
+//      List<Capture> queryCaptures = Lists.newArrayList();
+//      final Map<DruidServer, ServerExpectations> finalExpectation = serverExpectationList.get(
+//          serverExpectationList.size() - 1
+//      );
+//      for (Map.Entry<DruidServer, ServerExpectations> entry : finalExpectation.entrySet()) {
+//        DruidServer server = entry.getKey();
+//        ServerExpectations expectations = entry.getValue();
+//
+//
+//        EasyMock.expect(serverView.getQueryRunner(server))
+//                .andReturn(expectations.getQueryRunner())
+//                .once();
+//
+//        final Capture<? extends QueryPlus> capture = new Capture();
+//        final Capture<? extends Map> context = new Capture();
+//        queryCaptures.add(capture);
+//        QueryRunner queryable = expectations.getQueryRunner();
+//
+//        if (query instanceof TimeseriesQuery) {
+//          List<String> segmentIds = Lists.newArrayList();
+//          List<Interval> intervals = Lists.newArrayList();
+//          List<Iterable<Result<TimeseriesResultValue>>> results = Lists.newArrayList();
+//          for (ServerExpectation expectation : expectations) {
+//            segmentIds.add(expectation.getSegmentId());
+//            intervals.add(expectation.getInterval());
+//            results.add(expectation.getResults());
+//          }
+//          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
+//                  .andReturn(toQueryableTimeseriesResults(expectBySegment, segmentIds, intervals, results))
+//                  .once();
+//
+//        } else if (query instanceof TopNQuery) {
+//          List<String> segmentIds = Lists.newArrayList();
+//          List<Interval> intervals = Lists.newArrayList();
+//          List<Iterable<Result<TopNResultValue>>> results = Lists.newArrayList();
+//          for (ServerExpectation expectation : expectations) {
+//            segmentIds.add(expectation.getSegmentId());
+//            intervals.add(expectation.getInterval());
+//            results.add(expectation.getResults());
+//          }
+//          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
+//                  .andReturn(toQueryableTopNResults(segmentIds, intervals, results))
+//                  .once();
+//        } else if (query instanceof SearchQuery) {
+//          List<String> segmentIds = Lists.newArrayList();
+//          List<Interval> intervals = Lists.newArrayList();
+//          List<Iterable<Result<SearchResultValue>>> results = Lists.newArrayList();
+//          for (ServerExpectation expectation : expectations) {
+//            segmentIds.add(expectation.getSegmentId());
+//            intervals.add(expectation.getInterval());
+//            results.add(expectation.getResults());
+//          }
+//          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
+//                  .andReturn(toQueryableSearchResults(segmentIds, intervals, results))
+//                  .once();
+//        } else if (query instanceof SelectQuery) {
+//          List<String> segmentIds = Lists.newArrayList();
+//          List<Interval> intervals = Lists.newArrayList();
+//          List<Iterable<Result<SelectResultValue>>> results = Lists.newArrayList();
+//          for (ServerExpectation expectation : expectations) {
+//            segmentIds.add(expectation.getSegmentId());
+//            intervals.add(expectation.getInterval());
+//            results.add(expectation.getResults());
+//          }
+//          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
+//                  .andReturn(toQueryableSelectResults(segmentIds, intervals, results))
+//                  .once();
+//        } else if (query instanceof GroupByQuery) {
+//          List<String> segmentIds = Lists.newArrayList();
+//          List<Interval> intervals = Lists.newArrayList();
+//          List<Iterable<Row>> results = Lists.newArrayList();
+//          for (ServerExpectation expectation : expectations) {
+//            segmentIds.add(expectation.getSegmentId());
+//            intervals.add(expectation.getInterval());
+//            results.add(expectation.getResults());
+//          }
+//          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
+//                  .andReturn(toQueryableGroupByResults(segmentIds, intervals, results))
+//                  .once();
+//        } else if (query instanceof TimeBoundaryQuery) {
+//          List<String> segmentIds = Lists.newArrayList();
+//          List<Interval> intervals = Lists.newArrayList();
+//          List<Iterable<Result<TimeBoundaryResultValue>>> results = Lists.newArrayList();
+//          for (ServerExpectation expectation : expectations) {
+//            segmentIds.add(expectation.getSegmentId());
+//            intervals.add(expectation.getInterval());
+//            results.add(expectation.getResults());
+//          }
+//          EasyMock.expect(queryable.run(EasyMock.capture(capture), EasyMock.capture(context)))
+//                  .andReturn(toQueryableTimeBoundaryResults(segmentIds, intervals, results))
+//                  .once();
+//        } else {
+//          throw new ISE("Unknown query type[%s]", query.getClass());
+//        }
+//      }
+//
+//      final int expectedResultsRangeStart;
+//      final int expectedResultsRangeEnd;
+//      if (query instanceof TimeBoundaryQuery) {
+//        expectedResultsRangeStart = i;
+//        expectedResultsRangeEnd = i + 1;
+//      } else {
+//        expectedResultsRangeStart = 0;
+//        expectedResultsRangeEnd = i + 1;
+//      }
+//
+//      runWithMocks(
+//          () -> {
+//              HashMap<String, List> context = new HashMap<String, List>();
+//              for (int j = 0; j < numTimesToQuery; ++j) {
+//                TestHelper.assertExpectedResults(
+//                    new MergeIterable<>(
+//                        Comparators.naturalNullsFirst(),
+//                        FunctionalIterable
+//                            .create(new RangeIterable(expectedResultsRangeStart, expectedResultsRangeEnd))
+//                            .transformCat((Integer inputInt) ->
+//                                  {
+//                                    List<Iterable<Result<Object>>> retVal = Lists.newArrayList();
+//
+//                                    final Map<DruidServer, ServerExpectations> exps = serverExpectationList.get(inputInt);
+//                                    for (ServerExpectations expectations : exps.values()) {
+//                                      for (ServerExpectation expectation : expectations) {
+//                                        retVal.add(expectation.getResults());
+//                                      }
+//                                    }
+//
+//                                    return retVal;
+//                                  }
+//
+//                            )
+//                    ),
+//                    runner.run(
+//                        QueryPlus.wrap(
+//                            query.withQuerySegmentSpec(
+//                                new MultipleIntervalSegmentSpec(ImmutableList.of(actualQueryInterval))
+//                            )
+//                        ),
+//                        context
+//                    )
+//                );
+//                if (queryCompletedCallback != null) {
+//                  queryCompletedCallback.run();
+//                }
+//              }
+//          },
+//          mocks.toArray()
+//      );
+//
+//      // make sure all the queries were sent down as 'bySegment'
+//      for (Capture queryCapture : queryCaptures) {
+//        QueryPlus capturedQueryPlus = (QueryPlus) queryCapture.getValue();
+//        Query capturedQuery = capturedQueryPlus.getQuery();
+//        if (expectBySegment) {
+//          Assert.assertEquals(true, capturedQuery.getContextValue("bySegment"));
+//        } else {
+//          Assert.assertTrue(
+//              capturedQuery.getContextValue("bySegment") == null ||
+//              capturedQuery.getContextValue("bySegment").equals(false)
+//          );
+//        }
+//      }
+//    }
+//  }
 
   private List<Map<DruidServer, ServerExpectations>> populateTimeline(
       List<Interval> queryIntervals,
diff --git a/server/src/test/java/io/druid/server/shard/MultipleDimensionShardSpecTest.java b/server/src/test/java/io/druid/server/shard/MultipleDimensionShardSpecTest.java
new file mode 100644
index 00000000000..266a07b89b8
--- /dev/null
+++ b/server/src/test/java/io/druid/server/shard/MultipleDimensionShardSpecTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.server.shard;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.MapBasedInputRow;
+import io.druid.java.util.common.Pair;
+import io.druid.timeline.partition.MultipleDimensionShardSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MultipleDimensionShardSpecTest
+{
+  final List<String> dimensions = Arrays.asList("project", "event_id");
+
+  @Test
+  public void testIsInChunk()
+  {
+    Map<MultipleDimensionShardSpec, List<Pair<Boolean, Map<String, String>>>> tests = ImmutableMap.<MultipleDimensionShardSpec, List<Pair<Boolean, Map<String, String>>>>builder()
+        .put(
+            makeSpecV2(Arrays.asList(null, null), Arrays.asList(null, null)),
+            makeList(
+                true, null, null,
+                true, "1", "10",
+                true, "2", "12",
+                true, null, "12",
+                true, "1", null
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList(null, null), Arrays.asList("20", "12")),
+            makeList(
+                true, null, null,
+                true, "1", "10",
+                true, "20", "11",
+                true, "2", "12",
+                true, null, "12",
+                false, "20", "12"
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("20", "12"), Arrays.asList(null, null)),
+            makeList(
+                false, null, null,
+                true, "20", "12",
+                false, "20", null
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("2", "25"), Arrays.asList("20", "27")),
+            makeList(
+                false, null, null,
+                false, "1", "10",
+                false, "2", "12",
+                true, "2", "26",
+                true, "2", "25",
+                false, "30", "10",
+                true, "20", "26",
+                false, "20", "27",
+                false, null, "26",
+                true, "20", "12"
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("3", "25"), Arrays.asList("5", "25")),
+            makeList(
+                false, null, null,
+                false, "1", "10",
+                false, "2", "12",
+                true, "3", "25",
+                true, "4", "-1",
+                true, "5", "24",
+                false, "3", null
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("95699", "15720794"), Arrays.asList(null, null)),
+            makeList(
+                true, "96588", "-1"
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("618988", "15594345"), Arrays.asList("621824", "13152975")),
+            makeList(
+                true, "621824", "-1",
+                true, "621824", "13149927",
+                false, "-1", "13149927"
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("0", null), Arrays.asList("1", null)),
+            makeList(
+                false, "1", "2"
+            )
+        )
+        .put(
+            makeSpecV2(Arrays.asList("1", null), Arrays.asList("10", "10")),
+            makeList(
+                true, "1", "2"
+            )
+        )
+        .build();
+
+    for (Map.Entry<MultipleDimensionShardSpec, List<Pair<Boolean, Map<String, String>>>> entry : tests.entrySet()) {
+      MultipleDimensionShardSpec shardSpec = entry.getKey();
+      for (Pair<Boolean, Map<String, String>> pair : entry.getValue()) {
+        final InputRow inputRow = new MapBasedInputRow(
+            0, dimensions, Maps.transformValues(
+            pair.rhs, input -> input
+        )
+        );
+        Assert.assertEquals(
+            String.format("spec[%s], row[%s]", shardSpec, inputRow),
+            pair.lhs,
+            shardSpec.isInChunk(inputRow.getTimestampFromEpoch(), inputRow)
+        );
+      }
+    }
+  }
+
+  @Test
+  public void testDomain()
+  {
+    List<String> dims = ImmutableList.of("dim1", "dim2", "dim3");
+    List<Range> rangeList = ImmutableList.of(
+        Range.closed("1", "3").span(Range.singleton("4")),
+        Range.singleton("100").span(Range.singleton("1")),
+        Range.closed("2", "25")
+    );
+    MultipleDimensionShardSpec multiDim = new MultipleDimensionShardSpec(
+        dims,
+        Range.closedOpen("1,5,9", "4,1,2"),
+        rangeList,
+        1
+    );
+
+    Map<String, RangeSet<String>> domain = multiDim.getDomain();
+
+    Map<String, RangeSet<String>> expected = new HashMap<>();
+    int i = 0;
+    for (String dim : dims) {
+      RangeSet<String> rangeSet = TreeRangeSet.create();
+      rangeSet.add(rangeList.get(i));
+      expected.put(dim, rangeSet);
+      i++;
+    }
+
+    Assert.assertEquals(
+        String.format("domain[%s], rangeSet[%s]", domain, expected),
+        expected,
+        domain
+    );
+  }
+
+  private MultipleDimensionShardSpec makeSpec(List<String> startList, List<String> endList)
+  {
+    return new MultipleDimensionShardSpec(
+        dimensions,
+        MultipleDimensionShardSpec.computeRange(startList, endList),
+        null,
+        0
+    );
+  }
+
+  private MultipleDimensionShardSpec makeSpecV2(List<String> startList, List<String> endList)
+  {
+    return new MultipleDimensionShardSpec(
+        dimensions,
+        MultipleDimensionShardSpec.computeRange(startList, endList),
+        null,
+        0);
+  }
+
+  private Map<String, String> makeMap(String value1, String value2)
+  {
+    if (value1 == null && value2 == null) {
+      return ImmutableMap.<String, String>of();
+    }
+
+    if (value1 == null) {
+      return ImmutableMap.of(dimensions.get(1), value2);
+    }
+
+    if (value2 == null) {
+      return ImmutableMap.of(dimensions.get(0), value1);
+    }
+
+    return ImmutableMap.of(dimensions.get(0), value1, dimensions.get(1), value2);
+  }
+
+  private List<Pair<Boolean, Map<String, String>>> makeList(Object... arguments)
+  {
+    Preconditions.checkState(arguments.length % 3 == 0);
+
+    final ArrayList<Pair<Boolean, Map<String, String>>> retVal = Lists.newArrayList();
+
+    for (int i = 0; i < arguments.length; i += 3) {
+      retVal.add(Pair.of((Boolean) arguments[i], makeMap((String) arguments[i + 1], (String) arguments[i + 2])));
+    }
+
+    return retVal;
+  }
+}
diff --git a/server/src/test/java/io/druid/timeline/DataSegmentTest.java b/server/src/test/java/io/druid/timeline/DataSegmentTest.java
index c7c3fc9e822..c63eb3e159d 100644
--- a/server/src/test/java/io/druid/timeline/DataSegmentTest.java
+++ b/server/src/test/java/io/druid/timeline/DataSegmentTest.java
@@ -24,12 +24,14 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.jackson.JacksonUtils;
 import io.druid.segment.IndexIO;
+import io.druid.timeline.partition.MultipleDimensionShardSpec;
 import io.druid.timeline.partition.NoneShardSpec;
 import io.druid.timeline.partition.SingleDimensionShardSpec;
 import org.joda.time.Interval;
@@ -162,6 +164,27 @@ public void testIdentifierWithNonzeroPartition()
     );
   }
 
+  @Test
+  public void testIdentifierWithNonzeroMultiDimPartition()
+  {
+    final DataSegment segment = DataSegment.builder()
+        .dataSource("foo")
+        .interval(Intervals.of("2012-01-01/2012-01-02"))
+        .version(DateTimes.of("2012-01-01T11:22:33.444Z").toString())
+        .shardSpec(
+            new MultipleDimensionShardSpec(
+                Lists.newArrayList("bar"),
+                Range.closedOpen("abc", "def"),
+                Lists.newArrayList(Range.closedOpen("abc", "def")),
+                1)
+        ).build();
+
+    Assert.assertEquals(
+        "foo_2012-01-01T00:00:00.000Z_2012-01-02T00:00:00.000Z_2012-01-01T11:22:33.444Z_1",
+        segment.getIdentifier()
+    );
+  }
+
   @Test
   public void testV1SerializationNullMetrics() throws Exception
   {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org