You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2020/11/11 00:20:51 UTC

[asterixdb] branch master updated: [NO ISSUE][RT] Percentage List RangeMap

This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1282347  [NO ISSUE][RT] Percentage List RangeMap
1282347 is described below

commit 12823479069e8563b6b15a2dcd8b4a0cdd257b85
Author: mikhail <mi...@gmail.com>
AuthorDate: Sun Nov 8 15:42:38 2020 -0800

    [NO ISSUE][RT] Percentage List RangeMap
    
    -user model changes: no
    -storage formate changes: no
    -interface changes: no
    
    Details:
     - Dynamic RangeMaps include a percentage list used for
       partitioning.
    
    Change-Id: Ie08773688111965ad820d8993fcb886a0aecdcb6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/7903
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Ali Alsuliman <al...@gmail.com>
---
 .../window/pg_win/pg_win.23.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.74.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.75.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.76.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.77.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.78.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.79.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.80.query.sqlpp            |  2 +-
 .../window/pg_win/pg_win.88.query.sqlpp            |  2 +-
 .../asterix/lang/common/util/RangeMapBuilder.java  |  2 +-
 .../std/RangeMapAggregateDescriptor.java           | 80 +++++++++++++++++-----
 .../physical/RangePartitionExchangePOperator.java  |  5 +-
 .../DoubleArraySerializerDeserializer.java         | 69 +++++++++++++++++++
 ...AbstractFieldRangePartitionComputerFactory.java | 67 +++++++++++++++---
 .../range/FieldRangePartitionComputerFactory.java  | 10 ++-
 .../common/data/partition/range/RangeMap.java      | 15 ++--
 .../std/misc/SortForwardOperatorDescriptor.java    |  6 +-
 ...ieldRangeMultiPartitionComputerFactoryTest.java |  4 +-
 18 files changed, 229 insertions(+), 47 deletions(-)

diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp
index a08ad42..7c534a8 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.23.query.sqlpp
@@ -27,4 +27,4 @@ FROM (select distinct ten, four from tenk1) ss
 SELECT four, ten div 4 as two,
   sum(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) as `sum`,
   last_value(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) as `last_value`
-ORDER BY four, ten div 4
+ORDER BY four, ten div 4, `sum`
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp
index a251721..b5c7466 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.74.query.sqlpp
@@ -26,4 +26,4 @@ use test;
 FROM tenk1
 WHERE unique2 < 10
 SELECT lag(ten) OVER (PARTITION BY four ORDER BY ten) AS lag, ten, four
-ORDER BY four, ten
+ORDER BY four, ten, lag
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp
index d6d27ab..47f9ca4 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.75.query.sqlpp
@@ -26,4 +26,4 @@ use test;
 FROM tenk1 t
 WHERE t.unique2 < 10
 SELECT lag(t.ten) OVER (PARTITION BY t.four ORDER BY t.ten) AS lag, t.ten, t.four
-ORDER BY t.four, t.ten
+ORDER BY t.four, t.ten, lag
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp
index ba9be1e..44683bd 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.76.query.sqlpp
@@ -26,4 +26,4 @@ use test;
 FROM tenk1
 WHERE unique2 < 10
 SELECT lag(ten, four) OVER (PARTITION BY four ORDER BY ten) AS lag, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lag
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp
index 2081002..98e9b9e 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.77.query.sqlpp
@@ -26,5 +26,5 @@ use test;
 FROM tenk1
 WHERE unique2 < 10
 SELECT lag(ten, four, 0) OVER (PARTITION BY four ORDER BY ten) AS lag, ten, four
-ORDER BY four, ten
+ORDER BY four, ten, lag
 
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp
index 518516b..1cd7a2d 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.78.query.sqlpp
@@ -26,4 +26,4 @@ use test;
 FROM tenk1
 WHERE unique2 < 10
 SELECT lead(ten) OVER (PARTITION BY four ORDER BY ten) AS lead, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lead
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp
index 37ba499..b0c09b7 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.79.query.sqlpp
@@ -26,4 +26,4 @@ use test;
 FROM tenk1
 WHERE unique2 < 10
 SELECT lead(ten * 2, 1) OVER (PARTITION BY four ORDER BY ten) AS lead, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lead
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp
index 226a854..509bcea 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.80.query.sqlpp
@@ -26,4 +26,4 @@ use test;
 FROM tenk1
 WHERE unique2 < 10
 SELECT lead(ten * 2, 1, -1) OVER (PARTITION BY four ORDER BY ten) AS lead, ten, four
-ORDER BY four, ten
\ No newline at end of file
+ORDER BY four, ten, lead
\ No newline at end of file
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp
index 08c7446..3a7c8b2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/window/pg_win/pg_win.88.query.sqlpp
@@ -27,4 +27,4 @@ FROM (select distinct ten, four from tenk1) ss
 SELECT four, ten div 4 as two,
   sum(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) AS `sum`,
   last_value(ten div 4) over (partition by four order by ten div 4 rows between unbounded preceding and current row) AS `last_value`
-ORDER BY four, ten div 4
+ORDER BY four, ten div 4, `sum`
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
index 2d749d2..da00cfd 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/util/RangeMapBuilder.java
@@ -94,7 +94,7 @@ public class RangeMapBuilder {
             // TODO Add support for composite fields.
         }
 
-        return new RangeMap(1, abvs.getByteArray(), offsets);
+        return new RangeMap(1, abvs.getByteArray(), offsets, null);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
index 2338841..2084e48 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/RangeMapAggregateDescriptor.java
@@ -55,6 +55,7 @@ import org.apache.hyracks.data.std.primitive.VoidPointable;
 import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleArraySerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntArraySerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 
@@ -145,6 +146,8 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
         private final Comparator<List<byte[]>> comparator;
         private final int numOfPartitions;
         private final int numOrderByFields;
+        private final int[] splitPoints;
+        private final double[] percentages;
 
         @SuppressWarnings("unchecked")
         private RangeMapFunction(IScalarEvaluatorFactory[] args, IEvaluatorContext context, boolean[] ascending,
@@ -155,6 +158,8 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
             this.comparator = createComparator(ascending, argsTypes);
             this.numOfPartitions = numOfPartitions;
             this.numOrderByFields = numOrderByFields;
+            this.splitPoints = new int[numOfPartitions - 1];
+            this.percentages = new double[numOfPartitions - 1];
         }
 
         @Override
@@ -214,26 +219,18 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
                     }
                 } else {
                     finalSamples.sort(comparator);
-                    // divide the samples evenly and pick the boundaries as split points
-                    int nextSplitOffset = (int) Math.ceil(finalSamples.size() / (double) numOfPartitions);
-                    int nextSplitIndex = nextSplitOffset - 1;
-                    int endOffsetsCounter = 0;
-                    int numRequiredSplits = numOfPartitions - 1;
-                    endOffsets = new int[numRequiredSplits * numOrderByFields];
+                    calculateSplitIndexes();
+                    calculatePercentSplit();
+
                     List<byte[]> sample;
-                    for (int split = 1; split <= numRequiredSplits; split++) {
-                        // pick the split point from sorted samples (could be <3> or <4,"John"> if it's multi-column)
-                        sample = finalSamples.get(nextSplitIndex);
-                        for (int column = 0; column < sample.size(); column++) {
-                            allSplitValuesOut.write(sample.get(column));
+                    int endOffsetsCounter = 0;
+                    endOffsets = new int[splitPoints.length * numOrderByFields];
+                    for (int i = 0; i < splitPoints.length; i++) {
+                        sample = finalSamples.get(splitPoints[i]);
+                        for (byte[] column : sample) {
+                            allSplitValuesOut.write(column);
                             endOffsets[endOffsetsCounter++] = storage.getLength();
                         }
-                        // go to the next split point
-                        nextSplitIndex += nextSplitOffset;
-                        // in case we go beyond the boundary of samples, we pick the last sample repeatedly
-                        if (nextSplitIndex >= finalSamples.size()) {
-                            nextSplitIndex = finalSamples.size() - 1;
-                        }
                     }
                 }
             } catch (IOException e) {
@@ -242,6 +239,54 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
             serializeRangeMap(numOrderByFields, storage.getByteArray(), endOffsets, result);
         }
 
+        private void calculateSplitIndexes() {
+            int nextSplitOffset = (int) Math.ceil(finalSamples.size() / (double) numOfPartitions);
+            int nextSplitIndex = nextSplitOffset - 1;
+
+            for (int split = 0; split < splitPoints.length; split++) {
+                splitPoints[split] = nextSplitIndex;
+                nextSplitIndex += nextSplitOffset;
+                // in case we go beyond the boundary of samples, we pick the last sample repeatedly
+                if (nextSplitIndex >= finalSamples.size()) {
+                    nextSplitIndex = finalSamples.size() - 1;
+                }
+            }
+        }
+
+        private void calculatePercentSplit() {
+            for (int i = 0; i < splitPoints.length; i++) {
+                List<byte[]> sampleAtSplit = finalSamples.get(splitPoints[i]);
+                int smallestIndexEqualToSample = splitPoints[i];
+                while (smallestIndexEqualToSample >= 0
+                        && comparator.compare(sampleAtSplit, finalSamples.get(smallestIndexEqualToSample)) == 0) {
+                    smallestIndexEqualToSample--;
+                }
+                smallestIndexEqualToSample++;
+
+                int largestSplitIncludingSample = i;
+                while (largestSplitIncludingSample < splitPoints.length && comparator.compare(sampleAtSplit,
+                        finalSamples.get(splitPoints[largestSplitIncludingSample])) == 0) {
+                    largestSplitIncludingSample++;
+                }
+                largestSplitIncludingSample--;
+
+                int largestIndexEqualToSample = splitPoints[largestSplitIncludingSample];
+                while (largestIndexEqualToSample < finalSamples.size()
+                        && comparator.compare(sampleAtSplit, finalSamples.get(largestIndexEqualToSample)) == 0) {
+                    largestIndexEqualToSample++;
+                }
+                largestIndexEqualToSample--;
+
+                double count = largestIndexEqualToSample - smallestIndexEqualToSample + 1;
+                double waterMark = smallestIndexEqualToSample - 1;
+                for (int j = i; j <= largestSplitIncludingSample; j++) {
+                    percentages[j] = ((splitPoints[j] - waterMark) * 100) / (count);
+                    waterMark = splitPoints[j];
+                }
+                i = largestSplitIncludingSample;
+            }
+        }
+
         @Override
         public void finishPartial(IPointable result) throws HyracksDataException {
             finish(result);
@@ -297,6 +342,7 @@ public class RangeMapAggregateDescriptor extends AbstractAggregateFunctionDynami
             IntegerSerializerDeserializer.write(numberFields, serRangeMap.getDataOutput());
             ByteArraySerializerDeserializer.write(splitValues, serRangeMap.getDataOutput());
             IntArraySerializerDeserializer.write(endOffsets, serRangeMap.getDataOutput());
+            DoubleArraySerializerDeserializer.write(percentages, serRangeMap.getDataOutput());
             binary.setValue(serRangeMap.getByteArray(), serRangeMap.getStartOffset(), serRangeMap.getLength());
             storage.reset();
             binarySerde.serialize(binary, storage.getDataOutput());
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 9c80f4c..5d943c8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -78,8 +78,9 @@ public final class RangePartitionExchangePOperator extends AbstractRangeExchange
     public Pair<IConnectorDescriptor, TargetConstraint> createConnectorDescriptor(IConnectorDescriptorRegistry spec,
             ILogicalOperator op, IOperatorSchema opSchema, JobGenContext context) throws AlgebricksException {
         Pair<int[], IBinaryComparatorFactory[]> pOrderColumns = createOrderColumnsAndComparators(op, opSchema, context);
-        FieldRangePartitionComputerFactory partitionerFactory = new FieldRangePartitionComputerFactory(
-                pOrderColumns.first, pOrderColumns.second, crateRangeMapSupplier(), op.getSourceLocation());
+        FieldRangePartitionComputerFactory partitionerFactory =
+                new FieldRangePartitionComputerFactory(pOrderColumns.first, pOrderColumns.second,
+                        crateRangeMapSupplier(), op.getSourceLocation(), rangeMapIsComputedAtRunTime);
         IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, partitionerFactory);
         return new Pair<>(conn, null);
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/DoubleArraySerializerDeserializer.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/DoubleArraySerializerDeserializer.java
new file mode 100644
index 0000000..b8a2b1b
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/marshalling/DoubleArraySerializerDeserializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.marshalling;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DoubleArraySerializerDeserializer implements ISerializerDeserializer<double[]> {
+    private static final long serialVersionUID = 1L;
+
+    public static final DoubleArraySerializerDeserializer INSTANCE = new DoubleArraySerializerDeserializer();
+
+    private DoubleArraySerializerDeserializer() {
+    }
+
+    @Override
+    public double[] deserialize(DataInput in) throws HyracksDataException {
+        return read(in);
+    }
+
+    @Override
+    public void serialize(double[] instance, DataOutput out) throws HyracksDataException {
+        write(instance, out);
+    }
+
+    public static double[] read(DataInput in) throws HyracksDataException {
+        try {
+            int len = in.readInt();
+            double[] array = new double[len];
+            for (int i = 0; i < array.length; ++i) {
+                array[i] = in.readDouble();
+            }
+            return array;
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    public static void write(double[] instance, DataOutput out) throws HyracksDataException {
+        try {
+            out.writeInt(instance.length);
+            for (int i = 0; i < instance.length; ++i) {
+                out.writeDouble(instance[i]);
+            }
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+}
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
index cefd38a..f176469 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/AbstractFieldRangePartitionComputerFactory.java
@@ -21,6 +21,8 @@ package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import java.io.Serializable;
 import java.util.BitSet;
+import java.util.Random;
+import java.util.function.Supplier;
 
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -61,11 +63,10 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
 
         final IHyracksTaskContext taskContext;
 
-        final RangeMapPartitionComputer rangeMapPartitionComputer;
+        RangeMapPartitionComputer rangeMapPartitionComputer;
 
         private AbstractFieldRangePartitionComputer(IHyracksTaskContext taskContext) {
             this.taskContext = taskContext;
-            this.rangeMapPartitionComputer = new RangeMapPartitionComputer();
         }
 
         public void initialize() throws HyracksDataException {
@@ -76,8 +77,10 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
     abstract class AbstractFieldRangeSinglePartitionComputer extends AbstractFieldRangePartitionComputer
             implements ITuplePartitionComputer {
 
-        AbstractFieldRangeSinglePartitionComputer(IHyracksTaskContext taskContext) {
+        AbstractFieldRangeSinglePartitionComputer(IHyracksTaskContext taskContext,
+                Supplier<RangeMapPartitionComputer> supplier) {
             super(taskContext);
+            this.rangeMapPartitionComputer = supplier.get();
         }
 
         @Override
@@ -96,6 +99,7 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
 
         AbstractFieldRangeMultiPartitionComputer(IHyracksTaskContext taskContext) {
             super(taskContext);
+            this.rangeMapPartitionComputer = new RangeMapPartitionComputer();
         }
 
         @Override
@@ -127,11 +131,11 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
                 throws HyracksDataException;
     }
 
-    final class RangeMapPartitionComputer {
+    class RangeMapPartitionComputer {
 
-        private RangeMap rangeMap;
+        protected RangeMap rangeMap;
 
-        private IBinaryComparator[] comparators;
+        protected IBinaryComparator[] comparators;
 
         protected void initialize(IHyracksTaskContext taskContext) throws HyracksDataException {
             rangeMap = rangeMapSupplier.getRangeMap(taskContext);
@@ -164,7 +168,7 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
             return (int) Math.floor(slotIndex / rangesPerPart);
         }
 
-        private int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+        protected int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
                 throws HyracksDataException {
             int slotIndex = 0;
             for (int slotNumber = 0, n = rangeMap.getSplitCount(); slotNumber < n; ++slotNumber) {
@@ -190,7 +194,7 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
             return slotIndex;
         }
 
-        private int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int slotNumber)
+        protected int compareSlotAndFields(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields, int slotNumber)
                 throws HyracksDataException {
             int c = 0;
             int startOffset = accessor.getTupleStartOffset(tIndex);
@@ -209,4 +213,51 @@ abstract class AbstractFieldRangePartitionComputerFactory implements Serializabl
             return c;
         }
     }
+
+    final class PercentageRangeMapPartitionComputer extends RangeMapPartitionComputer {
+        private final Random r = new Random();
+
+        @Override
+        protected int findRangeMapSlot(IFrameTupleAccessor accessor, int tIndex, int[] rangeFields)
+                throws HyracksDataException {
+            int slotIndex = 0;
+            for (int slotNumber = 0; slotNumber < rangeMap.getSplitCount(); ++slotNumber) {
+                int c = compareSlotAndFields(accessor, tIndex, rangeFields, slotNumber);
+                if (c == 0) {
+                    double percent = 100 * r.nextDouble();
+                    int slotIterator = slotNumber;
+
+                    while (slotIterator < rangeMap.getSplitCount()
+                            && compareSplittingVector(slotIterator, slotNumber) == 0) {
+                        percent -= rangeMap.getPercentages()[slotIterator];
+                        if (percent <= 0) {
+                            break;
+                        }
+                        slotIterator++;
+                    }
+                    return slotIterator;
+                }
+                if (c < 0) {
+                    return slotIndex;
+                }
+                slotIndex++;
+            }
+            return slotIndex;
+        }
+
+        private int compareSplittingVector(int slotNumber1, int slotNumber2) throws HyracksDataException {
+            int c = 0;
+            for (int fieldNum = 0; fieldNum < comparators.length; ++fieldNum) {
+                c = comparators[fieldNum].compare(rangeMap.getByteArray(),
+                        rangeMap.getStartOffset(fieldNum, slotNumber1), rangeMap.getLength(fieldNum, slotNumber1),
+                        rangeMap.getByteArray(), rangeMap.getStartOffset(fieldNum, slotNumber2),
+                        rangeMap.getLength(fieldNum, slotNumber2));
+                if (c != 0) {
+                    return c;
+                }
+            }
+            return c;
+        }
+    }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index 0816f2d..e8e8e14 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -29,19 +29,23 @@ import org.apache.hyracks.api.exceptions.SourceLocation;
 public final class FieldRangePartitionComputerFactory extends AbstractFieldRangePartitionComputerFactory
         implements ITuplePartitionComputerFactory {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
     private final int[] rangeFields;
 
+    private final boolean usePercentage;
+
     public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryComparatorFactory[] comparatorFactories,
-            RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation) {
+            RangeMapSupplier rangeMapSupplier, SourceLocation sourceLocation, boolean usePercentage) {
         super(rangeMapSupplier, comparatorFactories, sourceLocation);
         this.rangeFields = rangeFields;
+        this.usePercentage = usePercentage;
     }
 
     @Override
     public ITuplePartitionComputer createPartitioner(IHyracksTaskContext taskContext) {
-        return new AbstractFieldRangeSinglePartitionComputer(taskContext) {
+        return new AbstractFieldRangeSinglePartitionComputer(taskContext,
+                usePercentage ? PercentageRangeMapPartitionComputer::new : RangeMapPartitionComputer::new) {
             @Override
             protected int computePartition(IFrameTupleAccessor accessor, int tIndex, int nParts)
                     throws HyracksDataException {
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index aaacceb..146de3b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -35,16 +35,18 @@ import java.util.Arrays;
  * </pre>
  */
 public class RangeMap implements Serializable {
-    private static final long serialVersionUID = -7523433293419648234L;
+    private static final long serialVersionUID = 1L;
 
     private final int fields;
     private final byte[] bytes;
     private final int[] endOffsets;
+    private final double[] percentages;
 
-    public RangeMap(int numFields, byte[] bytes, int[] endOffsets) {
+    public RangeMap(int numFields, byte[] bytes, int[] endOffsets, double[] percentages) {
         this.fields = numFields;
         this.bytes = bytes;
         this.endOffsets = endOffsets;
+        this.percentages = percentages;
     }
 
     public int getSplitCount() {
@@ -55,6 +57,10 @@ public class RangeMap implements Serializable {
         return bytes;
     }
 
+    public double[] getPercentages() {
+        return percentages;
+    }
+
     public int getTag(int fieldIndex, int splitIndex) {
         return getSplitValueTag(getSplitValueIndex(fieldIndex, splitIndex));
     }
@@ -111,7 +117,7 @@ public class RangeMap implements Serializable {
 
     @Override
     public int hashCode() {
-        return fields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets);
+        return fields + Arrays.hashCode(bytes) + Arrays.hashCode(endOffsets) + Arrays.hashCode(percentages);
     }
 
     @Override
@@ -124,11 +130,12 @@ public class RangeMap implements Serializable {
         }
         RangeMap other = (RangeMap) object;
         return fields == other.fields && Arrays.equals(endOffsets, other.endOffsets)
-                && Arrays.equals(bytes, other.bytes);
+                && Arrays.equals(bytes, other.bytes) && Arrays.equals(percentages, other.percentages);
     }
 
     @Override
     public String toString() {
         return "{SPLIT:" + getSplitCount() + '}';
     }
+
 }
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
index 1daf9fb..b28b2d1 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SortForwardOperatorDescriptor.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.data.std.primitive.ByteArrayPointable;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.apache.hyracks.dataflow.common.data.marshalling.ByteArraySerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.marshalling.DoubleArraySerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntArraySerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
@@ -130,6 +131,7 @@ public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescri
         private int numFields;
         private byte[] splitValues;
         private int[] splitValuesEndOffsets;
+        private double[] percentages;
 
         private RangeMapReaderActivityNodePushable(IHyracksTaskContext ctx, RecordDescriptor inputRecordDescriptor,
                 ActivityId activityId, int partition) {
@@ -138,6 +140,7 @@ public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescri
             this.frameTupleReference = new FrameTupleReference();
             this.activityId = activityId;
             this.partition = partition;
+            this.numFields = -1;
         }
 
         @Override
@@ -165,6 +168,7 @@ public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescri
             numFields = IntegerSerializerDeserializer.read(dataInputStream);
             splitValues = ByteArraySerializerDeserializer.read(dataInputStream);
             splitValuesEndOffsets = IntArraySerializerDeserializer.read(dataInputStream);
+            percentages = DoubleArraySerializerDeserializer.read(dataInputStream);
         }
 
         @Override
@@ -181,7 +185,7 @@ public class SortForwardOperatorDescriptor extends AbstractForwardOperatorDescri
             // store the range map in the state object of ctx so that next activity (forward) could retrieve it
             TaskId rangeMapReaderTaskId = new TaskId(activityId, partition);
             RangeMapState rangeMapState = new RangeMapState(ctx.getJobletContext().getJobId(), rangeMapReaderTaskId);
-            rangeMapState.rangeMap = new RangeMap(numFields, splitValues, splitValuesEndOffsets);
+            rangeMapState.rangeMap = new RangeMap(numFields, splitValues, splitValuesEndOffsets, percentages);
             ctx.setStateObject(rangeMapState);
         }
     }
diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
index a61cd40..668e20f 100644
--- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-dataflow-common-test/src/test/java/org.apache.hyracks/dataflow/common/data/partition/range/AbstractFieldRangeMultiPartitionComputerFactoryTest.java
@@ -136,7 +136,7 @@ public abstract class AbstractFieldRangeMultiPartitionComputerFactoryTest extend
         for (int i = 0; i < integers.length; ++i) {
             offsets[i] = (i + 1) * INTEGER_LENGTH;
         }
-        return new RangeMap(1, getIntegerBytes(integers), offsets);
+        return new RangeMap(1, getIntegerBytes(integers), offsets, null);
     }
 
     private ByteBuffer prepareData(IHyracksTaskContext ctx, Long[] startPoints, Long duration)
@@ -194,7 +194,7 @@ public abstract class AbstractFieldRangeMultiPartitionComputerFactoryTest extend
         SourceLocation sourceLocation = new SourceLocation(0, 0);
 
         ITuplePartitionComputerFactory itpcf = new FieldRangePartitionComputerFactory(rangeFields,
-                minComparatorFactories, rangeMapSupplier, sourceLocation);
+                minComparatorFactories, rangeMapSupplier, sourceLocation, false);
 
         executeFieldRangePartitionTests(integers, itpcf, nParts, results, duration);