You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:10 UTC

[20/50] [abbrv] asterixdb git commit: snapshot for range state transition

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
deleted file mode 100644
index ff2e40b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangeMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-public interface IRangeMap {
-
-    public int getSplitCount();
-
-    public byte[] getByteArray(int columnIndex, int splitIndex);
-
-    public int getStartOffset(int columnIndex, int splitIndex);
-
-    public int getLength(int columnIndex, int splitIndex);
-
-    public int getTag(int columnIndex, int splitIndex);
-
-    // Min value functions
-    public byte[] getMinByteArray(int columnIndex);
-
-    public int getMinStartOffset(int columnIndex);
-
-    public int getMinLength(int columnIndex);
-
-    public int getMinTag(int columnIndex);
-
-    // Max value functions
-    public byte[] getMaxByteArray(int columnIndex);
-
-    public int getMaxStartOffset(int columnIndex);
-
-    public int getMaxLength(int columnIndex);
-
-    public int getMaxTag(int columnIndex);
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
deleted file mode 100644
index dcde70b..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/IRangePartitionType.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.common.data.partition.range;
-
-public interface IRangePartitionType {
-    public enum RangePartitioningType {
-        /**
-         * Partitioning is determined by finding the range partition where the first data point lies.
-         */
-        PROJECT,
-        /**
-         * Partitioning is determined by finding the range partition where the last data point lies.
-         */
-        PROJECT_END,
-        /**
-         * Partitioning is determined by finding all the range partitions where the data has a point.
-         */
-        SPLIT,
-        /**
-         * Partitioning is determined by finding all the range partitions where the data has a point
-         * or comes after the data point.
-         */
-        REPLICATE
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
index 00fb86d..c15d39a 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/RangeMap.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.dataflow.common.data.partition.range;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+
 /**
  * The range map stores the field split values in an byte array.
  * The first and last split values for each column represent the min and max values (not actually split values).

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
new file mode 100644
index 0000000..befaad9
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/RangeId.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import java.io.Serializable;
+
+/**
+ * Represents a range id in a logical plan.
+ */
+public final class RangeId implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private final int id;
+
+    public RangeId(int id) {
+        this.id = id;
+    }
+
+    public int getId() {
+        return id;
+    }
+
+    @Override
+    public String toString() {
+        return "RangeId(#" + id + ")";
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof RangeId)) {
+            return false;
+        } else {
+            return id == ((RangeId) obj).getId();
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
index 068d11a..520ddd9 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitionMergingConnectorDescriptor.java
@@ -34,24 +34,28 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
 import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
 import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
 import org.apache.hyracks.dataflow.std.collectors.SortMergeFrameReader;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
 
 public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNConnectorDescriptor {
     private static final long serialVersionUID = 1L;
 
     private final ITupleRangePartitionComputerFactory tprcf;
+    private final RangeId rangeId;
     private final int[] sortFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory nkcFactory;
 
     public MToNRangePartitionMergingConnectorDescriptor(IConnectorDescriptorRegistry spec,
-            ITupleRangePartitionComputerFactory tprcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
-            INormalizedKeyComputerFactory nkcFactory) {
+            ITupleRangePartitionComputerFactory tprcf, RangeId rangeId, int[] sortFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nkcFactory) {
         super(spec);
         this.tprcf = tprcf;
+        this.rangeId = rangeId;
         this.sortFields = sortFields;
         this.comparatorFactories = comparatorFactories;
         this.nkcFactory = nkcFactory;
@@ -61,9 +65,7 @@ public class MToNRangePartitionMergingConnectorDescriptor extends AbstractMToNCo
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
-        final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, tprcf.createPartitioner());
-        return rangeWriter;
+        return new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, tprcf, rangeId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
index 2338993..6c83abb 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/MToNRangePartitioningConnectorDescriptor.java
@@ -34,6 +34,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.std.base.AbstractMToNConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.collectors.IPartitionBatchManager;
 import org.apache.hyracks.dataflow.std.collectors.NonDeterministicPartitionBatchManager;
 import org.apache.hyracks.dataflow.std.collectors.PartitionCollector;
@@ -43,15 +44,17 @@ public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnec
     private static final long serialVersionUID = 1L;
 
     private final ITupleRangePartitionComputerFactory trpcf;
+    private final RangeId rangeId;
     private final int[] sortFields;
     private final IBinaryComparatorFactory[] comparatorFactories;
     private final INormalizedKeyComputerFactory nkcFactory;
 
     public MToNRangePartitioningConnectorDescriptor(IConnectorDescriptorRegistry spec,
-            ITupleRangePartitionComputerFactory trpcf, int[] sortFields, IBinaryComparatorFactory[] comparatorFactories,
-            INormalizedKeyComputerFactory nkcFactory) {
+            ITupleRangePartitionComputerFactory trpcf, RangeId rangeId, int[] sortFields,
+            IBinaryComparatorFactory[] comparatorFactories, INormalizedKeyComputerFactory nkcFactory) {
         super(spec);
         this.trpcf = trpcf;
+        this.rangeId = rangeId;
         this.sortFields = sortFields;
         this.comparatorFactories = comparatorFactories;
         this.nkcFactory = nkcFactory;
@@ -61,9 +64,7 @@ public class MToNRangePartitioningConnectorDescriptor extends AbstractMToNConnec
     public IFrameWriter createPartitioner(IHyracksTaskContext ctx, RecordDescriptor recordDesc,
             IPartitionWriterFactory edwFactory, int index, int nProducerPartitions, int nConsumerPartitions)
             throws HyracksDataException {
-        final PartitionRangeDataWriter rangeWriter = new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory,
-                recordDesc, trpcf.createPartitioner());
-        return rangeWriter;
+        return new PartitionRangeDataWriter(ctx, nConsumerPartitions, edwFactory, recordDesc, trpcf, rangeId);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
index f8240b8..2740a60 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/connectors/PartitionRangeDataWriter.java
@@ -23,25 +23,38 @@ import java.nio.ByteBuffer;
 import org.apache.hyracks.api.comm.IPartitionWriterFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
+import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.storage.IGrowableIntArray;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
 import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 
 public class PartitionRangeDataWriter extends AbstractPartitionDataWriter {
-    private final ITupleRangePartitionComputer tpc;
+    private final ITupleRangePartitionComputerFactory trpcf;
+    private final RangeId rangeId;
     private final IGrowableIntArray map;
+    private ITupleRangePartitionComputer tpc;
 
     public PartitionRangeDataWriter(IHyracksTaskContext ctx, int consumerPartitionCount,
-            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor, ITupleRangePartitionComputer tpc)
-            throws HyracksDataException {
+            IPartitionWriterFactory pwFactory, RecordDescriptor recordDescriptor,
+            ITupleRangePartitionComputerFactory trpcf, RangeId rangeId) throws HyracksDataException {
         super(ctx, consumerPartitionCount, pwFactory, recordDescriptor);
-        this.tpc = tpc;
+        this.trpcf = trpcf;
+        this.rangeId = rangeId;
         this.map = new IntArrayList(8, 8);
     }
 
     @Override
+    public void open() throws HyracksDataException {
+        super.open();
+        RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
+        tpc = trpcf.createPartitioner(rangeState.getRangeMap());
+    }
+
+    @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         if (!allocatedFrame) {
             allocateFrames();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
index 1087cf5..850bf56 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/IMergeJoinCheckerFactory.java
@@ -20,12 +20,13 @@ package org.apache.hyracks.dataflow.std.join;
 
 import java.io.Serializable;
 
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public interface IMergeJoinCheckerFactory extends Serializable {
 
-    IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition) throws HyracksDataException;
+    IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) throws HyracksDataException;
 
     RangePartitioningType getLeftPartitioningType();
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
index 649247e..6f0b33b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/MergeJoinOperatorDescriptor.java
@@ -203,7 +203,7 @@ public class MergeJoinOperatorDescriptor extends AbstractOperatorDescriptor {
             locks.setPartitions(nPartitions);
             RecordDescriptor inRecordDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
             final IMergeJoinChecker mjc = mergeJoinCheckerFactory.createMergeJoinChecker(leftKeys, rightKeys,
-                    partition);
+                    partition, null);
             return new RightDataOperator(ctx, partition, inRecordDesc, mjc);
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
index 7ca2542..15df580 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NaturalMergeJoinCheckerFactory.java
@@ -20,8 +20,9 @@ package org.apache.hyracks.dataflow.std.join;
 
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.dataflow.std.util.FrameTuplePairComparator;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory {
     private static final long serialVersionUID = 1L;
@@ -32,7 +33,7 @@ public class NaturalMergeJoinCheckerFactory implements IMergeJoinCheckerFactory
     }
 
     @Override
-    public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition) {
+    public IMergeJoinChecker createMergeJoinChecker(int[] keys0, int[] keys1, int partition, IRangeMap rangeMap) {
         final IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             comparators[i] = comparatorFactories[i].createBinaryComparator();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
index 15d91be..067246d 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/RangeForwardOperatorDescriptor.java
@@ -19,30 +19,23 @@
 package org.apache.hyracks.dataflow.std.misc;
 
 import java.nio.ByteBuffer;
-import java.util.List;
 
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
 import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
 import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.io.GeneratedRunFileReader;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.sort.ISorter;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 
 public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -50,11 +43,15 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final int RANGE_FORWARD_ACTIVITY_ID = 0;
     private static final int RANGE_WRITER_ACTIVITY_ID = 1;
 
+    private final RangeId rangeId;
     private final IRangeMap rangeMap;
 
-    public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, IRangeMap rangeMap) {
+    public RangeForwardOperatorDescriptor(IOperatorDescriptorRegistry spec, RangeId rangeId, IRangeMap rangeMap,
+            RecordDescriptor recordDescriptor) {
         super(spec, 1, 1);
+        this.rangeId = rangeId;
         this.rangeMap = rangeMap;
+        recordDescriptors[0] = recordDescriptor;
     }
 
     @Override
@@ -68,8 +65,8 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
     public static class RangeForwardTaskState extends AbstractStateObject {
         private IRangeMap rangeMap;
 
-        public RangeForwardTaskState(JobId jobId, TaskId taskId, IRangeMap rangeMap) {
-            super(jobId, taskId);
+        public RangeForwardTaskState(JobId jobId, RangeId rangeId, IRangeMap rangeMap) {
+            super(jobId, rangeId);
             this.rangeMap = rangeMap;
         }
 
@@ -93,8 +90,7 @@ public class RangeForwardOperatorDescriptor extends AbstractOperatorDescriptor {
 
                 @Override
                 public void open() throws HyracksDataException {
-                    state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(),
-                            new TaskId(getActivityId(), partition), rangeMap);
+                    state = new RangeForwardTaskState(ctx.getJobletContext().getJobId(), rangeId, rangeMap);
                     ctx.setStateObject(state);
                     writer.open();
                 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
index c3adc20..273d5ba 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/AbstractSorterOperatorDescriptor.java
@@ -19,9 +19,6 @@
 
 package org.apache.hyracks.dataflow.std.sort;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.logging.Level;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
index 2cf166b..637e195 100644
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/FieldRangePartitionComputerFactoryTest.java
@@ -32,10 +32,12 @@ import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.storage.IGrowableIntArray;
 import org.apache.hyracks.data.std.accessors.PointableBinaryRangeAscComparatorFactory;
@@ -48,8 +50,6 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameFixedFieldTupleAppender;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
 import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.dataflow.common.data.partition.range.RangeMap;
 import org.apache.hyracks.storage.common.arraylist.IntArrayList;
 import org.apache.hyracks.test.support.TestUtils;
@@ -143,8 +143,8 @@ public class FieldRangePartitionComputerFactoryTest extends TestCase {
         IHyracksTaskContext ctx = TestUtils.create(FRAME_SIZE);
         int[] rangeFields = new int[] { 0 };
         ITupleRangePartitionComputerFactory frpcf = new FieldRangePartitionComputerFactory(rangeFields,
-                comparatorFactories, rangeMap, rangeType);
-        ITupleRangePartitionComputer partitioner = frpcf.createPartitioner();
+                comparatorFactories, rangeType);
+        ITupleRangePartitionComputer partitioner = frpcf.createPartitioner(rangeMap);
 
         IFrameTupleAccessor accessor = new FrameTupleAccessor(RecordDesc);
         ByteBuffer buffer = prepareData(ctx, integers);