You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by pr...@apache.org on 2016/10/17 19:55:11 UTC

[21/50] [abbrv] asterixdb git commit: snapshot for range state transition

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
index d6cd363..9ca536b 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalindex/IntervalIndexJoiner.java
@@ -82,7 +82,7 @@ public class IntervalIndexJoiner extends AbstractMergeJoiner {
         super(ctx, partition, status, locks, leftRd, rightRd);
         this.point = imjcf.isOrderAsc() ? EndPointIndexItem.START_POINT : EndPointIndexItem.END_POINT;
 
-        this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition);
+        this.imjc = imjcf.createMergeJoinChecker(leftKeys, rightKeys, partition, null);
 
         this.leftKey = leftKeys[0];
         this.rightKey = rightKeys[0];

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
index 0dd358c..8c4c43d 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionJoinOperatorDescriptor.java
@@ -37,12 +37,13 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
 import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputSinkOperatorNodePushable;
 import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
+import org.apache.hyracks.dataflow.std.base.RangeId;
+import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor.RangeForwardTaskState;
 
 public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDescriptor {
     private static final long serialVersionUID = 1L;
@@ -65,14 +66,14 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
     private final int probeKey;
     private final int buildKey;
     private final IIntervalMergeJoinCheckerFactory imjcf;
-    private final IRangeMap rangeMap;
+    private final RangeId rangeId;
 
     private static final Logger LOGGER = Logger.getLogger(IntervalPartitionJoinOperatorDescriptor.class.getName());
 
     public IntervalPartitionJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memsize, long leftTupleCount,
             long rightTupleCount, long leftMaxDuration, long rightMaxDuration, int avgTuplesPerFrame, int[] leftKeys,
             int[] rightKeys, RecordDescriptor recordDescriptor, IIntervalMergeJoinCheckerFactory imjcf,
-            IRangeMap rangeMap) {
+            RangeId rangeId) {
         super(spec, 2, 1);
         this.memsize = memsize;
         this.buildKey = leftKeys[0];
@@ -86,7 +87,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
         this.probeKeys = rightKeys;
         recordDescriptors[0] = recordDescriptor;
         this.imjcf = imjcf;
-        this.rangeMap = rangeMap;
+        this.rangeId = rangeId;
     }
 
     @Override
@@ -137,8 +138,6 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
             final RecordDescriptor probeRd = recordDescProvider.getInputRecordDescriptor(probeAid, 0);
             final int k = IntervalPartitionUtil.determineK(buildTupleCount, buildMaxDuration, probeTupleCount,
                     probeMaxDuration, avgTuplesPerFrame);
-            final long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeMap, partition);
-            final long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeMap, partition);
 
             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private BuildAndPartitionTaskState state = new BuildAndPartitionTaskState(
@@ -160,6 +159,10 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
                                     + ") with 3.");
                         }
                     }
+
+                    RangeForwardTaskState rangeState = (RangeForwardTaskState) ctx.getStateObject(rangeId);
+                    long partitionStart = IntervalPartitionUtil.getStartOfPartition(rangeState.getRangeMap(), partition);
+                    long partitionEnd = IntervalPartitionUtil.getEndOfPartition(rangeState.getRangeMap(), partition);
                     ITuplePartitionComputer buildHpc = new IntervalPartitionComputerFactory(buildKey, k, partitionStart,
                             partitionEnd).createPartitioner();
                     ITuplePartitionComputer probeHpc = new IntervalPartitionComputerFactory(probeKey, k, partitionStart,
@@ -168,7 +171,7 @@ public class IntervalPartitionJoinOperatorDescriptor extends AbstractOperatorDes
                     state.partition = partition;
                     state.intervalPartitions = IntervalPartitionUtil.getMaxPartitions(state.k);
                     state.memoryForJoin = memsize;
-                    IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition);
+                    IIntervalMergeJoinChecker imjc = imjcf.createMergeJoinChecker(buildKeys, probeKeys, partition, rangeState.getRangeMap());
                     state.ipj = new IntervalPartitionJoiner(ctx, state.memoryForJoin, state.k, state.intervalPartitions,
                             BUILD_REL, PROBE_REL, imjc, buildRd, probeRd, buildHpc, probeHpc);
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
index 4c80ba8..415feae 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/joins/intervalpartition/IntervalPartitionUtil.java
@@ -28,8 +28,8 @@ import java.util.Map.Entry;
 import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinChecker;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
 
 public class IntervalPartitionUtil {
     public static final double C_CPU = 0.5;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
index 328697d..dfd36a2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java
@@ -33,7 +33,7 @@ import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 
-public interface IOptimizationContext extends ITypingContext, IVariableContext {
+public interface IOptimizationContext extends ITypingContext, IVariableContext, IRangeContext {
 
     @Override
     public IMetadataProvider<?, ?> getMetadataProvider();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java
new file mode 100644
index 0000000..9a70b7a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IRangeContext.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.base;
+
+import org.apache.hyracks.dataflow.std.base.RangeId;
+
+public interface IRangeContext {
+
+    public RangeId newRangeId();
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
index 5a3bc98..58c7a80 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/RangeForwardOperator.java
@@ -28,16 +28,23 @@ import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagation
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 
 public class RangeForwardOperator extends AbstractLogicalOperator {
 
+    private RangeId rangeId;
     private IRangeMap rangeMap;
 
-    public RangeForwardOperator(IRangeMap rangeMap) {
+    public RangeForwardOperator(RangeId rangeId, IRangeMap rangeMap) {
+        this.rangeId = rangeId;
         this.rangeMap = rangeMap;
     }
 
+    public RangeId getRangeId() {
+        return rangeId;
+    }
+
     public IRangeMap getRangeMap() {
         return rangeMap;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 8cb739a..7a5a89a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -410,7 +410,7 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         // TODO fix deep copy of range map
-        RangeForwardOperator opCopy = new RangeForwardOperator(op.getRangeMap());
+        RangeForwardOperator opCopy = new RangeForwardOperator(op.getRangeId(), op.getRangeMap());
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
         return opCopy;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index 6337187..fea0431 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -390,7 +390,7 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
 
     @Override
     public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException {
-        return new RangeForwardOperator(op.getRangeMap());
+        return new RangeForwardOperator(op.getRangeId(), op.getRangeMap());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
index 34c707b..91dba24 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HashPartitionExchangePOperator.java
@@ -100,7 +100,7 @@ public class HashPartitionExchangePOperator extends AbstractExchangePOperator {
         }
         ITuplePartitionComputerFactory tpcf = new FieldHashPartitionComputerFactory(keys, hashFunctionFactories);
         IConnectorDescriptor conn = new MToNPartitioningConnectorDescriptor(spec, tpcf);
-        return new Pair<IConnectorDescriptor, TargetConstraint>(conn, null);
+        return new Pair<>(conn, null);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
index 51f54f6..c24f3c8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/MergeJoinPOperator.java
@@ -44,9 +44,10 @@ import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertie
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
 import org.apache.hyracks.dataflow.std.join.MergeJoinOperatorDescriptor;
 
@@ -56,23 +57,28 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
     protected final List<LogicalVariable> keysLeftBranch;
     protected final List<LogicalVariable> keysRightBranch;
     private final IMergeJoinCheckerFactory mjcf;
-    private IRangeMap rangeMap;
+    private final RangeId leftRangeId;
+    private final RangeId rightRangeId;
+    private final IRangeMap rangeMapHint;
 
     private static final Logger LOGGER = Logger.getLogger(MergeJoinPOperator.class.getName());
 
     public MergeJoinPOperator(JoinKind kind, JoinPartitioningType partitioningType, List<LogicalVariable> sideLeft,
-            List<LogicalVariable> sideRight, int memSizeInFrames, IMergeJoinCheckerFactory mjcf, IRangeMap rangeMap) {
+            List<LogicalVariable> sideRight, int memSizeInFrames, IMergeJoinCheckerFactory mjcf, RangeId leftRangeId,
+            RangeId rightRangeId, IRangeMap rangeMapHint) {
         super(kind, partitioningType);
         this.memSizeInFrames = memSizeInFrames;
         this.keysLeftBranch = sideLeft;
         this.keysRightBranch = sideRight;
         this.mjcf = mjcf;
-        this.rangeMap = rangeMap;
+        this.leftRangeId = leftRangeId;
+        this.rightRangeId = rightRangeId;
+        this.rangeMapHint = rangeMapHint;
 
         LOGGER.fine("MergeJoinPOperator constructed with: JoinKind=" + kind + ", JoinPartitioningType="
                 + partitioningType + ", List<LogicalVariable>=" + keysLeftBranch + ", List<LogicalVariable>="
                 + keysRightBranch + ", int memSizeInFrames=" + memSizeInFrames + ", IMergeJoinCheckerFactory mjcf="
-                + mjcf + ", IRangeMap rangeMap=" + rangeMap + ".");
+                + mjcf + ", RangeId leftRangeId=" + leftRangeId + ", RangeId rightRangeId=" + rightRangeId + ".");
     }
 
     public List<LogicalVariable> getKeysLeftBranch() {
@@ -87,8 +93,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
         return mjcf;
     }
 
-    public IRangeMap getRangeMap() {
-        return rangeMap;
+    public RangeId getRangeId() {
+        return leftRangeId;
     }
 
     @Override
@@ -107,7 +113,8 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
         for (LogicalVariable v : keysLeftBranch) {
             order.add(new OrderColumn(v, mjcf.isOrderAsc() ? OrderKind.ASC : OrderKind.DESC));
         }
-        IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, rangeMap, RangePartitioningType.PROJECT);
+        IPartitioningProperty pp = new OrderedPartitionedProperty(order, null, leftRangeId, RangePartitioningType.PROJECT,
+                rangeMapHint);
         List<ILocalStructuralProperty> propsLocal = new ArrayList<>();
         propsLocal.add(new LocalOrderProperty(order));
         deliveredProperties = new StructuralPropertiesVector(pp, propsLocal);
@@ -137,8 +144,10 @@ public class MergeJoinPOperator extends AbstractJoinPOperator {
         ispRight.add(new LocalOrderProperty(orderRight));
 
         if (op.getExecutionMode() == AbstractLogicalOperator.ExecutionMode.PARTITIONED) {
-            ppLeft = new OrderedPartitionedProperty(orderLeft, null, rangeMap, mjcf.getLeftPartitioningType());
-            ppRight = new OrderedPartitionedProperty(orderRight, null, rangeMap, mjcf.getRightPartitioningType());
+            ppLeft = new OrderedPartitionedProperty(orderLeft, null, leftRangeId, mjcf.getLeftPartitioningType(),
+                    rangeMapHint);
+            ppRight = new OrderedPartitionedProperty(orderRight, null, rightRangeId, mjcf.getRightPartitioningType(),
+                    rangeMapHint);
         }
 
         pv[0] = new StructuralPropertiesVector(ppLeft, ispLeft);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
index 2324c5e..3ace793 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangeForwardPOperator.java
@@ -29,18 +29,28 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties
 import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
 import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.misc.RangeForwardOperatorDescriptor;
 
 public class RangeForwardPOperator extends AbstractPhysicalOperator {
 
+    private RangeId rangeId;
     private IRangeMap rangeMap;
 
-    public RangeForwardPOperator(IRangeMap rangeMap) {
+    public RangeForwardPOperator(RangeId rangeId, IRangeMap rangeMap) {
+        // Use when a range hint is provided.
+        this.rangeId = rangeId;
         this.rangeMap = rangeMap;
     }
 
+    public RangeForwardPOperator(RangeId rangeId) {
+        this(rangeId, null);
+    }
+
     public IRangeMap getRangeMap() {
         return rangeMap;
     }
@@ -72,7 +82,10 @@ public class RangeForwardPOperator extends AbstractPhysicalOperator {
             IOperatorSchema opSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RangeForwardOperatorDescriptor opDesc = new RangeForwardOperatorDescriptor(spec, rangeMap);
+        RecordDescriptor recordDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), opSchema,
+                context);
+        RangeForwardOperatorDescriptor opDesc = new RangeForwardOperatorDescriptor(spec, rangeId, rangeMap,
+                recordDescriptor);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
@@ -82,4 +95,10 @@ public class RangeForwardPOperator extends AbstractPhysicalOperator {
     public boolean expensiveThanMaterialization() {
         return false;
     }
+
+    @Override
+    public String toString() {
+        return getOperatorTag().toString() + " " + rangeId;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
index 8299f61..c58bb0c 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionExchangePOperator.java
@@ -48,25 +48,25 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitioningConnectorDescriptor;
 
 public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
 
     private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
-    private IRangeMap rangeMap;
+    private RangeId rangeId;
     private RangePartitioningType rangeType;
 
-    public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, IRangeMap rangeMap,
+    public RangePartitionExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain, RangeId rangeId,
             RangePartitioningType rangeType) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
-        this.rangeMap = rangeMap;
+        this.rangeId = rangeId;
         this.rangeType = rangeType;
     }
 
@@ -83,8 +83,8 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
         return rangeType;
     }
 
-    public IRangeMap getRangeMap() {
-        return rangeMap;
+    public RangeId getRangeId() {
+        return rangeId;
     }
 
     public INodeDomain getDomain() {
@@ -94,7 +94,7 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
         IPartitioningProperty p = new OrderedPartitionedProperty(new ArrayList<OrderColumn>(partitioningFields), domain,
-                rangeMap, rangeType);
+                rangeId, rangeType, null);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
         List<ILocalStructuralProperty> locals = new ArrayList<>();
@@ -141,15 +141,15 @@ public class RangePartitionExchangePOperator extends AbstractExchangePOperator {
             i++;
         }
         ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps,
-                rangeMap, rangeType);
-        IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, sortFields, binaryComps,
+                rangeType);
+        IConnectorDescriptor conn = new MToNRangePartitioningConnectorDescriptor(spec, tpcf, rangeId, sortFields, binaryComps,
                 nkcf);
         return new Pair<>(conn, null);
     }
 
     @Override
     public String toString() {
-        return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType;
+        return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType + " " + rangeId;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
index 9667427..31cd37d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/RangePartitionMergeExchangePOperator.java
@@ -51,25 +51,25 @@ import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
 import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
 import org.apache.hyracks.dataflow.common.data.partition.range.FieldRangePartitionComputerFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 import org.apache.hyracks.dataflow.std.connectors.MToNRangePartitionMergingConnectorDescriptor;
 
 public class RangePartitionMergeExchangePOperator extends AbstractExchangePOperator {
 
     private List<OrderColumn> partitioningFields;
     private INodeDomain domain;
-    private IRangeMap rangeMap;
+    private RangeId rangeId;
     private RangePartitioningType rangeType;
 
     public RangePartitionMergeExchangePOperator(List<OrderColumn> partitioningFields, INodeDomain domain,
-            IRangeMap rangeMap, RangePartitioningType rangeType) {
+            RangeId rangeId, RangePartitioningType rangeType) {
         this.partitioningFields = partitioningFields;
         this.domain = domain;
-        this.rangeMap = rangeMap;
+        this.rangeId = rangeId;
         this.rangeType = rangeType;
     }
 
@@ -86,8 +86,8 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
         return rangeType;
     }
 
-    public IRangeMap getRangeMap() {
-        return rangeMap;
+    public RangeId getRangeId() {
+        return rangeId;
     }
 
     public INodeDomain getDomain() {
@@ -96,7 +96,7 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
 
     @Override
     public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeMap, rangeType);
+        IPartitioningProperty p = new OrderedPartitionedProperty(partitioningFields, domain, rangeId, rangeType, null);
         AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
         List<ILocalStructuralProperty> op2Locals = op2.getDeliveredPhysicalProperties().getLocalProperties();
         List<ILocalStructuralProperty> locals = new ArrayList<>();
@@ -152,15 +152,15 @@ public class RangePartitionMergeExchangePOperator extends AbstractExchangePOpera
             i++;
         }
         ITupleRangePartitionComputerFactory tpcf = new FieldRangePartitionComputerFactory(sortFields, rangeComps,
-                rangeMap, rangeType);
-        IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, sortFields,
+                rangeType);
+        IConnectorDescriptor conn = new MToNRangePartitionMergingConnectorDescriptor(spec, tpcf, rangeId, sortFields,
                 binaryComps, nkcf);
         return new Pair<>(conn, null);
     }
 
     @Override
     public String toString() {
-        return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType;
+        return getOperatorTag().toString() + " " + partitioningFields + " " + rangeType + " " + rangeId;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
index c1dcd97..040e663 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/OrderedPartitionedProperty.java
@@ -25,29 +25,27 @@ import java.util.Map;
 
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 
 public class OrderedPartitionedProperty implements IPartitioningProperty {
 
     private List<OrderColumn> orderColumns;
     private INodeDomain domain;
-    private IRangeMap rangeMap;
+    private RangeId rangeId;
     private RangePartitioningType rangeType;
 
-    public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap,
-            RangePartitioningType rangeType) {
+    public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId,
+            RangePartitioningType rangeType, IRangeMap rangeMapHint) {
         this.domain = domain;
         this.orderColumns = orderColumns;
-        this.rangeMap = rangeMap;
+        this.rangeId = rangeId;
         this.rangeType = rangeType;
     }
 
-    public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, IRangeMap rangeMap) {
-        this.domain = domain;
-        this.orderColumns = orderColumns;
-        this.rangeMap = rangeMap;
-        this.rangeType = RangePartitioningType.PROJECT;
+    public OrderedPartitionedProperty(List<OrderColumn> orderColumns, INodeDomain domain, RangeId rangeId) {
+        this(orderColumns, domain, rangeId, RangePartitioningType.PROJECT, null);
     }
 
     public List<OrderColumn> getOrderColumns() {
@@ -81,7 +79,7 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
             List<FunctionalDependency> fds) {
         List<OrderColumn> columns = PropertiesUtil.replaceOrderColumnsByEqClasses(orderColumns, equivalenceClasses);
         columns = PropertiesUtil.applyFDsToOrderColumns(columns, fds);
-        return new OrderedPartitionedProperty(columns, domain, rangeMap);
+        return new OrderedPartitionedProperty(columns, domain, rangeId);
     }
 
     @Override
@@ -91,8 +89,8 @@ public class OrderedPartitionedProperty implements IPartitioningProperty {
         }
     }
 
-    public IRangeMap getRangeMap() {
-        return rangeMap;
+    public RangeId getRangeId() {
+        return rangeId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
index d3e21f7..6400dd1 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/properties/PropertiesUtil.java
@@ -161,11 +161,11 @@ public class PropertiesUtil {
                         if (mayExpandProperties) {
                             return (isPrefixOf(od.getOrderColumns().iterator(), or.getOrderColumns().iterator())
                                     && or.getRangePartitioningType().equals(od.getRangePartitioningType())
-                                    && or.getRangeMap().equals(od.getRangeMap()));
+                                    && or.getRangeId().equals(od.getRangeId()));
                         } else {
                             return (or.getOrderColumns().equals(od.getOrderColumns())
                                     && or.getRangePartitioningType().equals(od.getRangePartitioningType())
-                                    && or.getRangeMap().equals(od.getRangeMap()));
+                                    && or.getRangeId().equals(od.getRangeId()));
                         }
                     }
                     default: {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
index 86f61ad..12b9c66 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java
@@ -42,17 +42,19 @@ import org.apache.hyracks.algebricks.core.algebra.properties.DefaultNodeGroupDom
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector;
 import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 
 @SuppressWarnings({ "unchecked", "rawtypes" })
 public class AlgebricksOptimizationContext implements IOptimizationContext {
 
     private int varCounter;
+    private int rangeIdCounter;
     private final IExpressionEvalSizeComputer expressionEvalSizeComputer;
     private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory;
     private final PhysicalOptimizationConfig physicalOptimizationConfig;
     private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() {
 
-        Map<LogicalVariable, Integer> varSizeMap = new HashMap<LogicalVariable, Integer>();
+        Map<LogicalVariable, Integer> varSizeMap = new HashMap<>();
 
         @Override
         public void setVariableEvalSize(LogicalVariable var, int size) {
@@ -65,19 +67,19 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
         }
     };
 
-    private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<ILogicalOperator, IVariableTypeEnvironment>();
+    private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>();
 
-    private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<ILogicalOperator, HashSet<ILogicalOperator>>();
-    private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<IAlgebraicRewriteRule, HashSet<ILogicalOperator>>();
-    private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<LogicalVariable, FunctionalDependency>();
+    private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>();
+    private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>();
+    private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>();
 
     private IMetadataProvider metadataProvider;
-    private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<LogicalVariable>();
+    private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>();
 
-    protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<ILogicalOperator, List<FunctionalDependency>>();
-    protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>>();
+    protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>();
+    protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>();
 
-    protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<ILogicalOperator, ILogicalPropertiesVector>();
+    protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>();
     private final IExpressionTypeComputer expressionTypeComputer;
     private final IMissableTypeComputer nullableTypeComputer;
     private final INodeDomain defaultNodeDomain;
@@ -98,6 +100,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
             PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations,
             LogicalOperatorPrettyPrintVisitor prettyPrintVisitor) {
         this.varCounter = varCounter;
+        this.rangeIdCounter = -1;
         this.expressionEvalSizeComputer = expressionEvalSizeComputer;
         this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory;
         this.expressionTypeComputer = expressionTypeComputer;
@@ -125,6 +128,13 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     }
 
     @Override
+    public RangeId newRangeId() {
+        rangeIdCounter++;
+        RangeId id = new RangeId(rangeIdCounter);
+        return id;
+    }
+
+    @Override
     public IMetadataProvider getMetadataProvider() {
         return metadataProvider;
     }
@@ -148,7 +158,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     public void addToDontApplySet(IAlgebraicRewriteRule rule, ILogicalOperator op) {
         HashSet<ILogicalOperator> operators = dontApply.get(rule);
         if (operators == null) {
-            HashSet<ILogicalOperator> os = new HashSet<ILogicalOperator>();
+            HashSet<ILogicalOperator> os = new HashSet<>();
             os.add(op);
             dontApply.put(rule, os);
         } else {
@@ -164,7 +174,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     public boolean checkAndAddToAlreadyCompared(ILogicalOperator op1, ILogicalOperator op2) {
         HashSet<ILogicalOperator> ops = alreadyCompared.get(op1);
         if (ops == null) {
-            HashSet<ILogicalOperator> newEntry = new HashSet<ILogicalOperator>();
+            HashSet<ILogicalOperator> newEntry = new HashSet<>();
             newEntry.add(op2);
             alreadyCompared.put(op1, newEntry);
             return false;
@@ -204,9 +214,9 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     public List<LogicalVariable> findPrimaryKey(LogicalVariable recordVar) {
         FunctionalDependency fd = varToPrimaryKey.get(recordVar);
         if (fd == null) {
-            return null;
+            return new ArrayList<>();
         }
-        return new ArrayList<LogicalVariable>(fd.getHead());
+        return new ArrayList<>(fd.getHead());
     }
 
     @Override
@@ -299,7 +309,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
     public void updatePrimaryKeys(Map<LogicalVariable, LogicalVariable> mappedVars) {
         for (Map.Entry<LogicalVariable, FunctionalDependency> me : varToPrimaryKey.entrySet()) {
             FunctionalDependency fd = me.getValue();
-            List<LogicalVariable> hd = new ArrayList<LogicalVariable>();
+            List<LogicalVariable> hd = new ArrayList<>();
             for (LogicalVariable v : fd.getHead()) {
                 LogicalVariable v2 = mappedVars.get(v);
                 if (v2 == null) {
@@ -308,7 +318,7 @@ public class AlgebricksOptimizationContext implements IOptimizationContext {
                     hd.add(v2);
                 }
             }
-            List<LogicalVariable> tl = new ArrayList<LogicalVariable>();
+            List<LogicalVariable> tl = new ArrayList<>();
             for (LogicalVariable v : fd.getTail()) {
                 LogicalVariable v2 = mappedVars.get(v);
                 if (v2 == null) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
index 44e04b5..f6b1796 100644
--- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
+++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
@@ -21,7 +21,7 @@ package org.apache.hyracks.algebricks.data;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 
 public interface IBinaryComparatorFactoryProvider {
     public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending)

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
index 31d1099..4ec5e27 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/EnforceStructuralPropertiesRule.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder.OrderKind;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.FDsAndEquivClassesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractStableSortPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.BroadcastExchangePOperator;
@@ -60,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreSortedDi
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.PreclusteredGroupByPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RandomPartitionExchangePOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangePartitionMergeExchangePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SortMergeExchangePOperator;
@@ -89,8 +91,9 @@ import org.apache.hyracks.algebricks.core.config.AlgebricksConfig;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig;
 import org.apache.hyracks.algebricks.rewriter.util.PhysicalOptimizationsUtil;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangeMap;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
+import org.apache.hyracks.dataflow.std.base.RangeId;
 
 public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
@@ -155,8 +158,8 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
     }
 
     // Gets the index of a child to start top-down data property enforcement.
-    // If there is a partitioning-compatible child with the operator in opRef, start from this child;
-    // otherwise, start from child zero.
+    // If there is a partitioning-compatible child with the operator in opRef,
+    // start from this child; otherwise, start from child zero.
     private int getStartChildIndex(AbstractLogicalOperator op, PhysicalRequirements pr, boolean nestedPlan,
             IOptimizationContext context) throws AlgebricksException {
         IPhysicalPropertiesVector[] reqdProperties = null;
@@ -273,8 +276,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                 changed = true;
                 addEnforcers(op, childIndex, diff, rqd, delivered, childrenDomain, nestedPlan, context);
 
-                AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex)
-                        .getValue();
+                AbstractLogicalOperator newChild = (AbstractLogicalOperator) op.getInputs().get(childIndex).getValue();
 
                 if (newChild != child) {
                     delivered = newChild.getDeliveredPhysicalProperties();
@@ -333,7 +335,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
     private IPhysicalPropertiesVector newPropertiesDiff(AbstractLogicalOperator newChild,
             IPhysicalPropertiesVector required, boolean mayExpandPartitioningProperties, IOptimizationContext context)
-                    throws AlgebricksException {
+            throws AlgebricksException {
         IPhysicalPropertiesVector newDelivered = newChild.getDeliveredPhysicalProperties();
 
         Map<LogicalVariable, EquivalenceClass> newChildEqClasses = context.getEquivalenceClassMap(newChild);
@@ -373,14 +375,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                 preSortedDistinct.setDistinctByColumns(d.getDistinctByVarList());
                 break;
             }
+            default:
         }
     }
 
     private List<OrderColumn> getOrderColumnsFromGroupingProperties(List<ILocalStructuralProperty> reqd,
             List<ILocalStructuralProperty> dlvd) {
-        List<OrderColumn> returnedProperties = new ArrayList<OrderColumn>();
-        List<LogicalVariable> rqdCols = new ArrayList<LogicalVariable>();
-        List<LogicalVariable> dlvdCols = new ArrayList<LogicalVariable>();
+        List<OrderColumn> returnedProperties = new ArrayList<>();
+        List<LogicalVariable> rqdCols = new ArrayList<>();
+        List<LogicalVariable> dlvdCols = new ArrayList<>();
         for (ILocalStructuralProperty r : reqd) {
             r.getVariables(rqdCols);
         }
@@ -389,7 +392,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         }
 
         int prefix = dlvdCols.size() - 1;
-        for (; prefix >= 0;) {
+        while (prefix >= 0) {
             if (!rqdCols.contains(dlvdCols.get(prefix))) {
                 prefix--;
             } else {
@@ -403,7 +406,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             returnedProperties.add(new OrderColumn(orderColumns.get(j).getColumn(), orderColumns.get(j).getOrder()));
         }
         // maintain other order columns after the required order columns
-        if (returnedProperties.size() != 0) {
+        if (!returnedProperties.isEmpty()) {
             for (int j = prefix + 1; j < dlvdCols.size(); j++) {
                 OrderColumn oc = orderColumns.get(j);
                 returnedProperties.add(new OrderColumn(oc.getColumn(), oc.getOrder()));
@@ -464,9 +467,9 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             return;
         }
 
-        Mutable<ILogicalOperator> topOp = new MutableObject<ILogicalOperator>();
+        Mutable<ILogicalOperator> topOp = new MutableObject<>();
         topOp.setValue(op.getInputs().get(i).getValue());
-        LinkedList<LocalOrderProperty> oList = new LinkedList<LocalOrderProperty>();
+        LinkedList<LocalOrderProperty> oList = new LinkedList<>();
 
         for (ILocalStructuralProperty prop : localProperties) {
             switch (prop.getPropertyType()) {
@@ -478,7 +481,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                     LocalGroupingProperty g = (LocalGroupingProperty) prop;
                     Collection<LogicalVariable> vars = (g.getPreferredOrderEnforcer() != null)
                             ? g.getPreferredOrderEnforcer() : g.getColumnSet();
-                    List<OrderColumn> orderColumns = new ArrayList<OrderColumn>();
+                    List<OrderColumn> orderColumns = new ArrayList<>();
                     for (LogicalVariable v : vars) {
                         OrderColumn oc = new OrderColumn(v, OrderKind.ASC);
                         orderColumns.add(oc);
@@ -504,12 +507,12 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
 
     private Mutable<ILogicalOperator> enforceOrderProperties(List<LocalOrderProperty> oList,
             Mutable<ILogicalOperator> topOp, boolean isMicroOp, IOptimizationContext context)
-                    throws AlgebricksException {
-        List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<Pair<IOrder, Mutable<ILogicalExpression>>>();
+            throws AlgebricksException {
+        List<Pair<IOrder, Mutable<ILogicalExpression>>> oe = new LinkedList<>();
         for (LocalOrderProperty orderProperty : oList) {
             for (OrderColumn oc : orderProperty.getOrderColumns()) {
                 IOrder ordType = (oc.getOrder() == OrderKind.ASC) ? OrderOperator.ASC_ORDER : OrderOperator.DESC_ORDER;
-                Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<IOrder, Mutable<ILogicalExpression>>(ordType,
+                Pair<IOrder, Mutable<ILogicalExpression>> pair = new Pair<>(ordType,
                         new MutableObject<ILogicalExpression>(new VariableReferenceExpression(oc.getColumn())));
                 oe.add(pair);
             }
@@ -537,13 +540,15 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
             switch (pp.getPartitioningType()) {
                 case UNPARTITIONED: {
                     List<OrderColumn> ordCols = computeOrderColumns(deliveredByChild);
-                    if (ordCols == null || ordCols.size() == 0) {
+                    if (ordCols.isEmpty()) {
                         pop = new RandomMergeExchangePOperator();
                     } else {
                         if (op.getAnnotations().containsKey(OperatorAnnotations.USE_RANGE_CONNECTOR)) {
+                            RangeId rangeId = context.newRangeId();
                             IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
                                     .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
-                            pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeMap,
+                            addRangeForwardOperator(op.getInputs().get(i), rangeId, rangeMap, context);
+                            pop = new RangePartitionMergeExchangePOperator(ordCols, domain, rangeId,
                                     RangePartitioningType.PROJECT);
                         } else {
                             OrderColumn[] sortColumns = new OrderColumn[ordCols.size()];
@@ -554,8 +559,7 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                     break;
                 }
                 case UNORDERED_PARTITIONED: {
-                    List<LogicalVariable> varList = new ArrayList<LogicalVariable>(
-                            ((UnorderedPartitionedProperty) pp).getColumnSet());
+                    List<LogicalVariable> varList = new ArrayList<>(((UnorderedPartitionedProperty) pp).getColumnSet());
                     List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
                     List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
                     boolean propWasSet = false;
@@ -580,6 +584,12 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                     OrderedPartitionedProperty opp = (OrderedPartitionedProperty) pp;
                     List<ILocalStructuralProperty> cldLocals = deliveredByChild.getLocalProperties();
                     List<ILocalStructuralProperty> reqdLocals = required.getLocalProperties();
+
+                    // Add RangeForwardOperator.
+                    IRangeMap rangeMap = (IRangeMap) op.getAnnotations()
+                            .get(OperatorAnnotations.USE_RANGE_CONNECTOR);
+                    addRangeForwardOperator(op.getInputs().get(i), opp.getRangeId(), rangeMap, context);
+
                     boolean propWasSet = false;
                     pop = null;
                     if (reqdLocals != null && cldLocals != null && allAreOrderProps(cldLocals)) {
@@ -589,13 +599,13 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
                         if (PropertiesUtil.matchLocalProperties(reqdLocals, cldLocals, ecs, fds)) {
                             List<OrderColumn> orderColumns = getOrderColumnsFromGroupingProperties(reqdLocals,
                                     cldLocals);
-                            pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeMap(),
+                            pop = new RangePartitionMergeExchangePOperator(orderColumns, domain, opp.getRangeId(),
                                     opp.getRangePartitioningType());
                             propWasSet = true;
                         }
                     }
                     if (!propWasSet) {
-                        pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeMap(),
+                        pop = new RangePartitionExchangePOperator(opp.getOrderColumns(), domain, opp.getRangeId(),
                                 opp.getRangePartitioningType());
                     }
                     break;
@@ -630,6 +640,21 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
         }
     }
 
+    private void addRangeForwardOperator(Mutable<ILogicalOperator> op, RangeId rangeId, IRangeMap rangeMap,
+            IOptimizationContext context) throws AlgebricksException {
+        RangeForwardOperator rfo = new RangeForwardOperator(rangeId, rangeMap);
+        RangeForwardPOperator rfpo = new RangeForwardPOperator(rangeId, rangeMap);
+        rfo.setPhysicalOperator(rfpo);
+        setNewOp(op, rfo, context);
+        rfo.setExecutionMode(AbstractLogicalOperator.ExecutionMode.PARTITIONED);
+        OperatorPropertiesUtil.computeSchemaAndPropertiesRecIfNull(rfo, context);
+        context.computeAndSetTypeEnvironmentForOperator(rfo);
+        if (AlgebricksConfig.DEBUG) {
+            AlgebricksConfig.ALGEBRICKS_LOGGER.fine(">>>> Added range forward " + rfo.getPhysicalOperator() + ".\n");
+            printOp((AbstractLogicalOperator) op.getValue());
+        }
+    }
+
     private boolean allAreOrderProps(List<ILocalStructuralProperty> cldLocals) {
         for (ILocalStructuralProperty lsp : cldLocals) {
             if (lsp.getPropertyType() != PropertyType.LOCAL_ORDER_PROPERTY) {
@@ -647,17 +672,17 @@ public class EnforceStructuralPropertiesRule implements IAlgebraicRewriteRule {
     }
 
     private List<OrderColumn> computeOrderColumns(IPhysicalPropertiesVector pv) {
-        List<OrderColumn> ordCols = new ArrayList<OrderColumn>();
+        List<OrderColumn> ordCols = new ArrayList<>();
         List<ILocalStructuralProperty> localProps = pv.getLocalProperties();
-        if (localProps == null || localProps.size() == 0) {
-            return null;
+        if (localProps == null || localProps.isEmpty()) {
+            return new ArrayList<>();
         } else {
             for (ILocalStructuralProperty p : localProps) {
                 if (p.getPropertyType() == PropertyType.LOCAL_ORDER_PROPERTY) {
                     LocalOrderProperty lop = (LocalOrderProperty) p;
                     ordCols.addAll(lop.getOrderColumns());
                 } else {
-                    return null;
+                    return new ArrayList<>();
                 }
             }
             return ordCols;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
index e7cf912..101ec42 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/InlineVariablesRule.java
@@ -69,13 +69,13 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
 
     // Map of variables that could be replaced by their producing expression.
     // Populated during the top-down sweep of the plan.
-    protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<LogicalVariable, ILogicalExpression>();
+    protected Map<LogicalVariable, ILogicalExpression> varAssignRhs = new HashMap<>();
 
     // Visitor for replacing variable reference expressions with their originating expression.
     protected InlineVariablesVisitor inlineVisitor = new InlineVariablesVisitor(varAssignRhs);
 
     // Set of FunctionIdentifiers that we should not inline.
-    protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<FunctionIdentifier>();
+    protected Set<FunctionIdentifier> doNotInlineFuncs = new HashSet<>();
 
     protected boolean hasRun = false;
 
@@ -127,9 +127,9 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
     public static boolean functionIsConstantAtRuntime(AbstractLogicalOperator op,
             AbstractFunctionCallExpression funcExpr, IOptimizationContext context) throws AlgebricksException {
         //make sure that there are no variables in the expression
-        Set<LogicalVariable> usedVariables = new HashSet<LogicalVariable>();
+        Set<LogicalVariable> usedVariables = new HashSet<>();
         funcExpr.getUsedVariables(usedVariables);
-        if (usedVariables.size() > 0) {
+        if (!usedVariables.isEmpty()) {
             return false;
         }
         return true;
@@ -176,7 +176,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
                     }
                     // Variables produced by a nested subplan cannot be inlined
                     // in operators above the subplan.
-                    Set<LogicalVariable> producedVars = new HashSet<LogicalVariable>();
+                    Set<LogicalVariable> producedVars = new HashSet<>();
                     VariableUtilities.getProducedVariables(root.getValue(), producedVars);
                     varAssignRhs.keySet().removeAll(producedVars);
                 }
@@ -186,7 +186,7 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
         // References to variables generated in the right branch of a left-outer-join cannot be inlined
         // in operators above the left-outer-join.
         if (op.getOperatorTag() == LogicalOperatorTag.LEFTOUTERJOIN) {
-            Set<LogicalVariable> rightLiveVars = new HashSet<LogicalVariable>();
+            Set<LogicalVariable> rightLiveVars = new HashSet<>();
             VariableUtilities.getLiveVariables(op.getInputs().get(1).getValue(), rightLiveVars);
             varAssignRhs.keySet().removeAll(rightLiveVars);
         }
@@ -208,8 +208,8 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
     protected class InlineVariablesVisitor implements ILogicalExpressionReferenceTransform {
 
         private final Map<LogicalVariable, ILogicalExpression> varAssignRhs;
-        private final Set<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
-        private final List<LogicalVariable> rhsUsedVars = new ArrayList<LogicalVariable>();
+        private final Set<LogicalVariable> liveVars = new HashSet<>();
+        private final List<LogicalVariable> rhsUsedVars = new ArrayList<>();
         private ILogicalOperator op;
         private IOptimizationContext context;
         // If set, only replace this variable reference.
@@ -236,54 +236,60 @@ public class InlineVariablesRule implements IAlgebraicRewriteRule {
         public boolean transform(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
             ILogicalExpression e = exprRef.getValue();
             switch (((AbstractLogicalExpression) e).getExpressionTag()) {
-                case VARIABLE: {
-                    LogicalVariable var = ((VariableReferenceExpression) e).getVariableReference();
-                    // Restrict replacement to targetVar if it has been set.
-                    if (targetVar != null && var != targetVar) {
-                        return false;
-                    }
+                case VARIABLE:
+                    return transformVariable(exprRef);
+                case FUNCTION_CALL:
+                    return transformFunction(e);
+                default:
+                    return false;
 
-                    // Make sure has not been excluded from inlining.
-                    if (context.shouldNotBeInlined(var)) {
-                        return false;
-                    }
+            }
+        }
 
-                    ILogicalExpression rhs = varAssignRhs.get(var);
-                    if (rhs == null) {
-                        // Variable was not produced by an assign.
-                        return false;
-                    }
+        private boolean transformFunction(ILogicalExpression e) throws AlgebricksException {
+            AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e;
+            boolean modified = false;
+            for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
+                if (transform(arg)) {
+                    modified = true;
+                }
+            }
+            return modified;
+        }
 
-                    // Make sure used variables from rhs are live.
-                    if (liveVars.isEmpty()) {
-                        VariableUtilities.getLiveVariables(op, liveVars);
-                    }
-                    rhsUsedVars.clear();
-                    rhs.getUsedVariables(rhsUsedVars);
-                    for (LogicalVariable rhsUsedVar : rhsUsedVars) {
-                        if (!liveVars.contains(rhsUsedVar)) {
-                            return false;
-                        }
-                    }
+        private boolean transformVariable(Mutable<ILogicalExpression> exprRef) throws AlgebricksException {
+            LogicalVariable var = ((VariableReferenceExpression) exprRef.getValue()).getVariableReference();
+            // Restrict replacement to targetVar if it has been set.
+            if (targetVar != null && var != targetVar) {
+                return false;
+            }
 
-                    // Replace variable reference with a clone of the rhs expr.
-                    exprRef.setValue(rhs.cloneExpression());
-                    return true;
-                }
-                case FUNCTION_CALL: {
-                    AbstractFunctionCallExpression fce = (AbstractFunctionCallExpression) e;
-                    boolean modified = false;
-                    for (Mutable<ILogicalExpression> arg : fce.getArguments()) {
-                        if (transform(arg)) {
-                            modified = true;
-                        }
-                    }
-                    return modified;
-                }
-                default: {
+            // Make sure has not been excluded from inlining.
+            if (context.shouldNotBeInlined(var)) {
+                return false;
+            }
+
+            ILogicalExpression rhs = varAssignRhs.get(var);
+            if (rhs == null) {
+                // Variable was not produced by an assign.
+                return false;
+            }
+
+            // Make sure used variables from rhs are live.
+            if (liveVars.isEmpty()) {
+                VariableUtilities.getLiveVariables(op, liveVars);
+            }
+            rhsUsedVars.clear();
+            rhs.getUsedVariables(rhsUsedVars);
+            for (LogicalVariable rhsUsedVar : rhsUsedVars) {
+                if (!liveVars.contains(rhsUsedVar)) {
                     return false;
                 }
             }
+
+            // Replace variable reference with a clone of the rhs expr.
+            exprRef.setValue(rhs.cloneExpression());
+            return true;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 7eb9ac8..a54947a 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -49,6 +49,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleS
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
@@ -184,6 +185,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
     }
 
     @Override
+    public ILogicalOperator visitRangeForwardOperator(RangeForwardOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return visit(op);
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java
new file mode 100644
index 0000000..cd14434
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangeMap.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+public interface IRangeMap {
+
+    public int getSplitCount();
+
+    public byte[] getByteArray(int columnIndex, int splitIndex);
+
+    public int getStartOffset(int columnIndex, int splitIndex);
+
+    public int getLength(int columnIndex, int splitIndex);
+
+    public int getTag(int columnIndex, int splitIndex);
+
+    // Min value functions
+    public byte[] getMinByteArray(int columnIndex);
+
+    public int getMinStartOffset(int columnIndex);
+
+    public int getMinLength(int columnIndex);
+
+    public int getMinTag(int columnIndex);
+
+    // Max value functions
+    public byte[] getMaxByteArray(int columnIndex);
+
+    public int getMaxStartOffset(int columnIndex);
+
+    public int getMaxLength(int columnIndex);
+
+    public int getMaxTag(int columnIndex);
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java
new file mode 100644
index 0000000..abf1495
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IRangePartitionType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+public interface IRangePartitionType {
+    public enum RangePartitioningType {
+        /**
+         * Partitioning is determined by finding the range partition where the first data point lies.
+         */
+        PROJECT,
+        /**
+         * Partitioning is determined by finding the range partition where the last data point lies.
+         */
+        PROJECT_END,
+        /**
+         * Partitioning is determined by finding all the range partitions where the data has a point.
+         */
+        SPLIT,
+        /**
+         * Partitioning is determined by finding all the range partitions where the data has a point
+         * or comes after the data point.
+         */
+        REPLICATE
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
index 5406366..9b3e607 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITupleRangePartitionComputerFactory.java
@@ -21,5 +21,5 @@ package org.apache.hyracks.api.dataflow.value;
 import java.io.Serializable;
 
 public interface ITupleRangePartitionComputerFactory extends Serializable {
-    public ITupleRangePartitionComputer createPartitioner();
+    public ITupleRangePartitionComputer createPartitioner(IRangeMap rangeMap);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2061a388/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
index bd7be3f..162d3b4 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/range/FieldRangePartitionComputerFactory.java
@@ -21,28 +21,27 @@ package org.apache.hyracks.dataflow.common.data.partition.range;
 import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
 import org.apache.hyracks.api.dataflow.value.IBinaryRangeComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.IRangeMap;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITupleRangePartitionComputerFactory;
+import org.apache.hyracks.api.dataflow.value.IRangePartitionType.RangePartitioningType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.storage.IGrowableIntArray;
-import org.apache.hyracks.dataflow.common.data.partition.range.IRangePartitionType.RangePartitioningType;
 
 public class FieldRangePartitionComputerFactory implements ITupleRangePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
     private final int[] rangeFields;
-    private IRangeMap rangeMap;
     private IBinaryRangeComparatorFactory[] comparatorFactories;
     private RangePartitioningType rangeType;
 
     public FieldRangePartitionComputerFactory(int[] rangeFields, IBinaryRangeComparatorFactory[] comparatorFactories,
-            IRangeMap rangeMap, RangePartitioningType rangeType) {
+            RangePartitioningType rangeType) {
         this.rangeFields = rangeFields;
         this.comparatorFactories = comparatorFactories;
-        this.rangeMap = rangeMap;
         this.rangeType = rangeType;
     }
 
-    public ITupleRangePartitionComputer createPartitioner() {
+    public ITupleRangePartitionComputer createPartitioner(IRangeMap rangeMap) {
         final IBinaryComparator[] minComparators = new IBinaryComparator[comparatorFactories.length];
         for (int i = 0; i < comparatorFactories.length; ++i) {
             minComparators[i] = comparatorFactories[i].createMinBinaryComparator();
@@ -78,7 +77,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC
                     throws HyracksDataException {
                 switch (rangeType) {
                     case PROJECT: {
-                        int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+                        int minPartition = getPartitionMap(
+                                binarySearchRangePartition(accessor, tIndex, minComparators));
                         addPartition(minPartition, map);
                         break;
                     }
@@ -89,7 +89,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC
                         break;
                     }
                     case REPLICATE: {
-                        int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+                        int minPartition = getPartitionMap(
+                                binarySearchRangePartition(accessor, tIndex, minComparators));
                         int maxPartition = getPartitionMap(rangeMap.getSplitCount() + 1);
                         for (int pid = minPartition; pid < maxPartition; ++pid) {
                             addPartition(pid, map);
@@ -97,7 +98,8 @@ public class FieldRangePartitionComputerFactory implements ITupleRangePartitionC
                         break;
                     }
                     case SPLIT: {
-                        int minPartition = getPartitionMap(binarySearchRangePartition(accessor, tIndex, minComparators));
+                        int minPartition = getPartitionMap(
+                                binarySearchRangePartition(accessor, tIndex, minComparators));
                         int maxPartition = getPartitionMap(
                                 binarySearchRangePartition(accessor, tIndex, maxComparators));
                         for (int pid = minPartition; pid <= maxPartition; ++pid) {