You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by wa...@apache.org on 2016/10/12 00:09:34 UTC

[1/2] asterixdb git commit: Index-only plan step 2: Added SplitOperator

Repository: asterixdb
Updated Branches:
  refs/heads/master db1c115ec -> 76a4f9e36


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
index 6501aeb..74739da 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ReplicatePOperator.java
@@ -19,24 +19,19 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.physical;
 
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
-import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
-import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
-import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
 import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 
-public class ReplicatePOperator extends AbstractPhysicalOperator {
+public class ReplicatePOperator extends AbstractReplicatePOperator {
 
     @Override
     public PhysicalOperatorTag getOperatorTag() {
@@ -44,55 +39,25 @@ public class ReplicatePOperator extends AbstractPhysicalOperator {
     }
 
     @Override
-    public boolean isMicroOperator() {
-        return false;
-    }
-
-    @Override
-    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
-            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
-        return emptyUnaryRequirements();
-    }
-
-    @Override
-    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
-        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
-        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
-    }
-
-    @Override
     public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
             IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
             throws AlgebricksException {
         IOperatorDescriptorRegistry spec = builder.getJobSpec();
-        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
 
         ReplicateOperator rop = (ReplicateOperator) op;
         int outputArity = rop.getOutputArity();
         boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
 
-        SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity, outputMaterializationFlags);
+        ReplicateOperatorDescriptor splitOpDesc = new ReplicateOperatorDescriptor(spec, recDescriptor, outputArity,
+                outputMaterializationFlags);
         contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
         ILogicalOperator src = op.getInputs().get(0).getValue();
         builder.contributeGraphEdge(src, 0, op, 0);
     }
 
     @Override
-    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
-        int[] inputDependencyLabels = new int[] { 0 };
-        ReplicateOperator rop = (ReplicateOperator) op;
-        int[] outputDependencyLabels = new int[rop.getOutputArity()];
-        // change the labels of outputs that requires materialization to 1
-        boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
-        for (int i = 0; i < rop.getOutputArity(); i++) {
-            if (outputMaterializationFlags[i]) {
-                outputDependencyLabels[i] = 1;
-            }
-        }
-        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
-    }
-
-    @Override
     public boolean expensiveThanMaterialization() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
new file mode 100644
index 0000000..3b8aaab
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SplitPOperator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+public class SplitPOperator extends AbstractReplicatePOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SPLIT;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        SplitOperator sop = (SplitOperator) op;
+        int outputArity = sop.getOutputArity();
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recDescriptor = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
+                propagatedSchema, context);
+
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory brachingExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(
+                sop.getBranchingExpression().getValue(), context.getTypeEnvironment(op), inputSchemas, context);
+
+        IBinaryIntegerInspectorFactory intInsepctorFactory = context.getBinaryIntegerInspectorFactory();
+
+        SplitOperatorDescriptor splitOpDesc = new SplitOperatorDescriptor(spec, recDescriptor, outputArity,
+                brachingExprEvalFactory, intInsepctorFactory);
+
+        contributeOpDesc(builder, (AbstractLogicalOperator) op, splitOpDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index 7e83880..d3dd166 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -41,6 +41,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOpera
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
@@ -49,13 +50,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -63,7 +64,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOpe
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator.Kind;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionVisitor;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -236,15 +236,6 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Integer indent)
-            throws AlgebricksException {
-        addIndent(indent).append("partitioning-split (");
-        pprintExprList(op.getExpressions(), indent);
-        buffer.append(")");
-        return null;
-    }
-
-    @Override
     public Void visitSubplanOperator(SubplanOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("subplan {");
         printNestedPlans(op, indent);
@@ -363,6 +354,13 @@ public class LogicalOperatorPrettyPrintVisitor implements ILogicalOperatorVisito
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Integer indent) throws AlgebricksException {
+        Mutable<ILogicalExpression> branchingExpression = op.getBranchingExpression();
+        addIndent(indent).append("split " + branchingExpression.getValue().accept(exprVisitor, indent));
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("materialize");
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index c0f9718..f5ff8b4 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -34,18 +34,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -82,10 +82,10 @@ public interface ILogicalOperatorVisitor<R, T> {
 
     public R visitProjectOperator(ProjectOperator op, T arg) throws AlgebricksException;
 
-    public R visitPartitioningSplitOperator(PartitioningSplitOperator op, T arg) throws AlgebricksException;
-
     public R visitReplicateOperator(ReplicateOperator op, T arg) throws AlgebricksException;
 
+    public R visitSplitOperator(SplitOperator op, T arg) throws AlgebricksException;
+
     public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException;
 
     public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 39cac06..c5d7291 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -48,12 +48,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -174,13 +174,12 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
     }
 
     @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
-            throws AlgebricksException {
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
         return visit(op);
     }
 
     @Override
-    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
         return visit(op);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
deleted file mode 100644
index 2d5c929..0000000
--- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/PartitioningSplitOperatorDescriptor.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.runtime.operators.std;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.data.IBinaryBooleanInspector;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.api.comm.IFrame;
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.data.std.api.IPointable;
-import org.apache.hyracks.data.std.primitive.VoidPointable;
-import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
-import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-
-public class PartitioningSplitOperatorDescriptor extends AbstractSingleActivityOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-    public static int NO_DEFAULT_BRANCH = -1;
-
-    private final IScalarEvaluatorFactory[] evalFactories;
-    private final IBinaryBooleanInspector boolInspector;
-    private final int defaultBranchIndex;
-
-    public PartitioningSplitOperatorDescriptor(IOperatorDescriptorRegistry spec,
-            IScalarEvaluatorFactory[] evalFactories, IBinaryBooleanInspector boolInspector, int defaultBranchIndex,
-            RecordDescriptor rDesc) {
-        super(spec, 1, (defaultBranchIndex == evalFactories.length) ? evalFactories.length + 1 : evalFactories.length);
-        for (int i = 0; i < evalFactories.length; i++) {
-            recordDescriptors[i] = rDesc;
-        }
-        this.evalFactories = evalFactories;
-        this.boolInspector = boolInspector;
-        this.defaultBranchIndex = defaultBranchIndex;
-    }
-
-    @Override
-    public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-            final IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions)
-                    throws HyracksDataException {
-        return new AbstractUnaryInputOperatorNodePushable() {
-            private final IFrameWriter[] writers = new IFrameWriter[outputArity];
-            private final boolean[] isOpen = new boolean[outputArity];
-            private final IFrame[] writeBuffers = new IFrame[outputArity];
-            private final IScalarEvaluator[] evals = new IScalarEvaluator[outputArity];
-            private final IPointable evalPointable = new VoidPointable();
-            private final RecordDescriptor inOutRecDesc = recordDescProvider.getInputRecordDescriptor(getActivityId(),
-                    0);
-            private final FrameTupleAccessor accessor = new FrameTupleAccessor(inOutRecDesc);
-            private final FrameTupleReference frameTuple = new FrameTupleReference();
-
-            private final FrameTupleAppender tupleAppender = new FrameTupleAppender();
-            private final ArrayTupleBuilder tupleBuilder = new ArrayTupleBuilder(inOutRecDesc.getFieldCount());
-            private final DataOutput tupleDos = tupleBuilder.getDataOutput();
-
-            @Override
-            public void close() throws HyracksDataException {
-                HyracksDataException hde = null;
-                for (int i = 0; i < outputArity; i++) {
-                    if (isOpen[i]) {
-                        try {
-                            tupleAppender.reset(writeBuffers[i], false);
-                            tupleAppender.write(writers[i], false);
-                        } catch (Throwable th) {
-                            if (hde == null) {
-                                hde = new HyracksDataException();
-                            }
-                            hde.addSuppressed(th);
-                        } finally {
-                            try {
-                                writers[i].close();
-                            } catch (Throwable th) {
-                                if (hde == null) {
-                                    hde = new HyracksDataException();
-                                }
-                                hde.addSuppressed(th);
-                            }
-                        }
-                    }
-                }
-                if (hde != null) {
-                    throw hde;
-                }
-            }
-
-            @Override
-            public void flush() throws HyracksDataException {
-                for (int i = 0; i < outputArity; i++) {
-                    tupleAppender.reset(writeBuffers[i], false);
-                    tupleAppender.flush(writers[i]);
-                }
-            }
-
-            @Override
-            public void fail() throws HyracksDataException {
-                HyracksDataException hde = null;
-                for (int i = 0; i < outputArity; i++) {
-                    if (isOpen[i]) {
-                        try {
-                            writers[i].fail();
-                        } catch (Throwable th) {
-                            if (hde == null) {
-                                hde = new HyracksDataException();
-                            }
-                            hde.addSuppressed(th);
-                        }
-                    }
-                }
-                if (hde != null) {
-                    throw hde;
-                }
-            }
-
-            @Override
-            public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
-                accessor.reset(buffer);
-                int tupleCount = accessor.getTupleCount();
-                for (int i = 0; i < tupleCount; i++) {
-                    frameTuple.reset(accessor, i);
-                    boolean found = false;
-                    for (int j = 0; j < evals.length; j++) {
-                        try {
-                            evals[j].evaluate(frameTuple, evalPointable);
-                        } catch (AlgebricksException e) {
-                            throw new HyracksDataException(e);
-                        }
-                        found = boolInspector.getBooleanValue(evalPointable.getByteArray(),
-                                evalPointable.getStartOffset(), evalPointable.getLength());
-                        if (found) {
-                            copyAndAppendTuple(j);
-                            break;
-                        }
-                    }
-                    // Optionally write to default output branch.
-                    if (!found && defaultBranchIndex != NO_DEFAULT_BRANCH) {
-                        copyAndAppendTuple(defaultBranchIndex);
-                    }
-                }
-            }
-
-            private void copyAndAppendTuple(int outputIndex) throws HyracksDataException {
-                // Copy tuple into tuple builder.
-                try {
-                    tupleBuilder.reset();
-                    for (int i = 0; i < frameTuple.getFieldCount(); i++) {
-                        tupleDos.write(frameTuple.getFieldData(i), frameTuple.getFieldStart(i),
-                                frameTuple.getFieldLength(i));
-                        tupleBuilder.addFieldEndOffset();
-                    }
-                } catch (IOException e) {
-                    throw new HyracksDataException(e);
-                }
-                tupleAppender.reset(writeBuffers[outputIndex], false);
-                FrameUtils.appendToWriter(writers[outputIndex], tupleAppender, tupleBuilder.getFieldEndOffsets(),
-                        tupleBuilder.getByteArray(), 0, tupleBuilder.getSize());
-            }
-
-            @Override
-            public void open() throws HyracksDataException {
-                for (int i = 0; i < writers.length; i++) {
-                    isOpen[i] = true;
-                    writers[i].open();
-                }
-                // Create write buffers.
-                for (int i = 0; i < outputArity; i++) {
-                    writeBuffers[i] = new VSizeFrame(ctx);
-                    // Make sure to clear all buffers, since we are reusing the tupleAppender.
-                    tupleAppender.reset(writeBuffers[i], true);
-                }
-                // Create evaluators for partitioning.
-                try {
-                    for (int i = 0; i < evalFactories.length; i++) {
-                        evals[i] = evalFactories[i].createScalarEvaluator(ctx);
-                    }
-                } catch (AlgebricksException e) {
-                    throw new HyracksDataException(e);
-                }
-            }
-
-            @Override
-            public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                writers[index] = writer;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
new file mode 100644
index 0000000..2215a96
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SplitOperatorDescriptor.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+/**
+ * Split operator propagates each tuple in a frame to one output branch only unlike Replicate operator.
+ */
+public class SplitOperatorDescriptor extends AbstractReplicateOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    private IScalarEvaluatorFactory brachingExprEvalFactory;
+    private IBinaryIntegerInspectorFactory intInsepctorFactory;
+
+    public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+            IScalarEvaluatorFactory brachingExprEvalFactory, IBinaryIntegerInspectorFactory intInsepctorFactory) {
+        super(spec, rDesc, outputArity);
+        this.brachingExprEvalFactory = brachingExprEvalFactory;
+        this.intInsepctorFactory = intInsepctorFactory;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SplitterMaterializerActivityNode sma = new SplitterMaterializerActivityNode(
+                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        builder.addActivity(this, sma);
+        builder.addSourceEdge(0, sma, 0);
+        for (int i = 0; i < outputArity; i++) {
+            builder.addTargetEdge(i, sma, i);
+        }
+    }
+
+    // The difference between SplitterMaterializerActivityNode and ReplicatorMaterializerActivityNode is that
+    // SplitterMaterializerActivityNode propagates each tuple to one output branch only.
+    private final class SplitterMaterializerActivityNode extends ReplicatorMaterializerActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public SplitterMaterializerActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                throws HyracksDataException {
+            final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+            final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
+            final IPointable p = VoidPointable.FACTORY.createPointable();;
+            // To deal with each tuple in a frame
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(recordDescriptors[0]);;
+            final FrameTupleAppender[] appenders = new FrameTupleAppender[numberOfNonMaterializedOutputs];
+            final FrameTupleReference tRef = new FrameTupleReference();;
+            final IBinaryIntegerInspector intInsepctor = intInsepctorFactory.createBinaryIntegerInspector(ctx);
+            final IScalarEvaluator eval;
+            try {
+                eval = brachingExprEvalFactory.createScalarEvaluator(ctx);
+            } catch (AlgebricksException ae) {
+                throw new HyracksDataException(ae);
+            }
+            for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true);
+            }
+
+            return new AbstractUnaryInputOperatorNodePushable() {
+                @Override
+                public void open() throws HyracksDataException {
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        isOpen[i] = true;
+                        writers[i].open();
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                    // Tuple based access
+                    accessor.reset(bufferAccessor);
+                    int tupleCount = accessor.getTupleCount();
+                    // The output branch number that starts from 0.
+                    int outputBranch;
+
+                    for (int i = 0; i < tupleCount; i++) {
+                        // Get the output branch number from the field in the given tuple.
+                        tRef.reset(accessor, i);
+                        try {
+                            eval.evaluate(tRef, p);
+                        } catch (AlgebricksException ae) {
+                            throw new HyracksDataException(ae);
+                        }
+                        outputBranch = intInsepctor.getIntegerValue(p.getByteArray(), p.getStartOffset(),
+                                p.getLength());
+
+                        // Add this tuple to the correct output frame.
+                        FrameUtils.appendToWriter(writers[outputBranch], appenders[outputBranch], accessor, i);
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    HyracksDataException hde = null;
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        if (isOpen[i]) {
+                            try {
+                                appenders[i].write(writers[i], true);
+                                writers[i].close();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    HyracksDataException hde = null;
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        if (isOpen[i]) {
+                            try {
+                                writers[i].fail();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                    writers[index] = writer;
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
new file mode 100644
index 0000000..0ea8a88
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-0.tbl
@@ -0,0 +1,4 @@
+0,first branch1
+0,first branch2
+0,first branch3
+0,first branch4

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
new file mode 100644
index 0000000..53588ef
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1-split-1.tbl
@@ -0,0 +1,3 @@
+1,second branch1
+1,second branch2
+1,second branch3

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
new file mode 100644
index 0000000..ceb859a
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/simple/int-string-part1.tbl
@@ -0,0 +1,7 @@
+0|first branch1
+1|second branch1
+0|first branch2
+1|second branch2
+0|first branch3
+1|second branch3
+0|first branch4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index 020cffe..1276518 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -50,6 +50,7 @@ import org.apache.hyracks.algebricks.runtime.operators.std.NestedTupleSourceRunt
 import org.apache.hyracks.algebricks.runtime.operators.std.PrinterRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.RunningAggregateRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SplitOperatorDescriptor;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
@@ -83,7 +84,7 @@ import org.apache.hyracks.dataflow.std.file.FileSplit;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.LineFileWriteOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.group.preclustered.PreclusteredGroupOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.sort.InMemorySortOperatorDescriptor;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -570,7 +571,7 @@ public class PushRuntimeTest {
     }
 
     @Test
-    public void scanSplitWrite() throws Exception {
+    public void scanReplicateWrite() throws Exception {
         final int outputArity = 2;
 
         JobSpecification spec = new JobSpecification(FRAME_SIZE);
@@ -596,7 +597,69 @@ public class PushRuntimeTest {
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
 
-        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
+        ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] {
+                    new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(outputFile[i])) });
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+                    new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        for (int i = 0; i < outputArity; i++) {
+            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+        }
+    }
+
+    @Test
+    public void scanSplitWrite() throws Exception {
+        final int outputArity = 2;
+
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
+
+        String inputFileName[] = { "data/simple/int-string-part1.tbl", "data/simple/int-string-part1-split-0.tbl",
+                "data/simple/int-string-part1-split-1.tbl" };
+        File[] inputFiles = new File[inputFileName.length];
+        for (int i=0; i<inputFileName.length; i++) {
+            inputFiles[i] = new File(inputFileName[i]);
+        }
+        File[] outputFile = new File[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFile[i] = File.createTempFile("splitop", null);
+        }
+
+        FileSplit[] inputSplits = new FileSplit[] {
+                new FileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, new FileReference(inputFiles[0])) };
+        IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits);
+
+        RecordDescriptor scannerDesc = new RecordDescriptor(
+                new ISerializerDeserializer[] { IntegerSerializerDeserializer.INSTANCE,
+                        new UTF8StringSerializerDeserializer() });
+
+        IValueParserFactory[] valueParsers = new IValueParserFactory[] { IntegerParserFactory.INSTANCE,
+                UTF8StringParserFactory.INSTANCE };
+
+        FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, scannerDesc, outputArity,
+                new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY);
 
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp,
                 new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
@@ -609,7 +672,7 @@ public class PushRuntimeTest {
                     new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
         }
 
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
+        spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, splitOp, 0);
         for (int i = 0; i < outputArity; i++) {
             spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
         }
@@ -620,7 +683,7 @@ public class PushRuntimeTest {
         AlgebricksHyracksIntegrationUtil.runJob(spec);
 
         for (int i = 0; i < outputArity; i++) {
-            compareFiles(inputFileName, outputFile[i].getAbsolutePath());
+            compareFiles(inputFileName[i + 1], outputFile[i].getAbsolutePath());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..5c642ba
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.base;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.TaskId;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState;
+
+/**
+ * Abstract class for two replication related operator descriptor - replicate and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperatorDescriptor {
+    protected static final long serialVersionUID = 1L;
+
+    protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
+    protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
+
+    protected final boolean[] outputMaterializationFlags;
+    protected final boolean requiresMaterialization;
+    protected final int numberOfNonMaterializedOutputs;
+    protected final int numberOfMaterializedOutputs;
+
+    public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
+            int outputArity) {
+        this(spec, rDesc, outputArity, new boolean[outputArity]);
+    }
+
+    public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
+            int outputArity, boolean[] outputMaterializationFlags) {
+        super(spec, 1, outputArity);
+        for (int i = 0; i < outputArity; i++) {
+            recordDescriptors[i] = rDesc;
+        }
+        this.outputMaterializationFlags = outputMaterializationFlags;
+
+        boolean reqMaterialization = false;
+        int matOutputs = 0;
+        int nonMatOutputs = 0;
+        for (boolean flag : outputMaterializationFlags) {
+            if (flag) {
+                reqMaterialization = true;
+                matOutputs++;
+            } else {
+                nonMatOutputs++;
+            }
+        }
+
+        this.requiresMaterialization = reqMaterialization;
+        this.numberOfMaterializedOutputs = matOutputs;
+        this.numberOfNonMaterializedOutputs = nonMatOutputs;
+
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        ReplicatorMaterializerActivityNode sma = new ReplicatorMaterializerActivityNode(
+                new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
+        builder.addActivity(this, sma);
+        builder.addSourceEdge(0, sma, 0);
+        int pipelineOutputIndex = 0;
+        int activityId = MATERIALIZE_READER_ACTIVITY_ID;
+        for (int i = 0; i < outputArity; i++) {
+            if (outputMaterializationFlags[i]) {
+                MaterializeReaderActivityNode mra = new MaterializeReaderActivityNode(
+                        new ActivityId(odId, activityId++));
+                builder.addActivity(this, mra);
+                builder.addBlockingEdge(sma, mra);
+                builder.addTargetEdge(i, mra, 0);
+            } else {
+                builder.addTargetEdge(i, sma, pipelineOutputIndex++);
+            }
+        }
+    }
+
+    protected class ReplicatorMaterializerActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public ReplicatorMaterializerActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                throws HyracksDataException {
+            return new AbstractUnaryInputOperatorNodePushable() {
+                private MaterializerTaskState state;
+                private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
+                private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
+
+                @Override
+                public void open() throws HyracksDataException {
+                    if (requiresMaterialization) {
+                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
+                                new TaskId(getActivityId(), partition), numberOfMaterializedOutputs);
+                        state.open(ctx);
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        isOpen[i] = true;
+                        writers[i].open();
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                    if (requiresMaterialization) {
+                        state.appendFrame(bufferAccessor);
+                        bufferAccessor.clear();
+                    }
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        FrameUtils.flushFrame(bufferAccessor, writers[i]);
+                    }
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        writers[i].flush();
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    HyracksDataException hde = null;
+                    try {
+                        if (requiresMaterialization) {
+                            state.close();
+                            ctx.setStateObject(state);
+                        }
+                    } finally {
+                        for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                            if (isOpen[i]) {
+                                try {
+                                    writers[i].close();
+                                } catch (Throwable th) {
+                                    if (hde == null) {
+                                        hde = new HyracksDataException(th);
+                                    } else {
+                                        hde.addSuppressed(th);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    HyracksDataException hde = null;
+                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
+                        if (isOpen[i]) {
+                            try {
+                                writers[i].fail();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = new HyracksDataException(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                    writers[index] = writer;
+                }
+            };
+        }
+    }
+
+    protected class MaterializeReaderActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public MaterializeReaderActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                throws HyracksDataException {
+            return new AbstractUnaryOutputSourceOperatorNodePushable() {
+
+                @Override
+                public void initialize() throws HyracksDataException {
+                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
+                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
+                    state.writeOut(writer, new VSizeFrame(ctx));
+                }
+
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
new file mode 100644
index 0000000..0782647
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ReplicateOperatorDescriptor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.std.misc;
+
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+
+public class ReplicateOperatorDescriptor extends AbstractReplicateOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+
+    public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
+        this(spec, rDesc, outputArity, new boolean[outputArity]);
+    }
+
+    public ReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+            boolean[] outputMaterializationFlags) {
+        super(spec, rDesc, outputArity, outputMaterializationFlags);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
deleted file mode 100644
index 67af861..0000000
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/SplitOperatorDescriptor.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.dataflow.std.misc;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hyracks.api.comm.IFrameWriter;
-import org.apache.hyracks.api.comm.VSizeFrame;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.api.dataflow.ActivityId;
-import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
-import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
-import org.apache.hyracks.api.dataflow.TaskId;
-import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
-import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
-import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
-import org.apache.hyracks.dataflow.std.base.AbstractOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
-import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
-
-public class SplitOperatorDescriptor extends AbstractOperatorDescriptor {
-    private static final long serialVersionUID = 1L;
-
-    private final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
-    private final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
-
-    private final boolean[] outputMaterializationFlags;
-    private final boolean requiresMaterialization;
-    private final int numberOfNonMaterializedOutputs;
-    private final int numberOfMaterializedOutputs;
-
-    public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity) {
-        this(spec, rDesc, outputArity, new boolean[outputArity]);
-    }
-
-    public SplitOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
-            boolean[] outputMaterializationFlags) {
-        super(spec, 1, outputArity);
-        for (int i = 0; i < outputArity; i++) {
-            recordDescriptors[i] = rDesc;
-        }
-        this.outputMaterializationFlags = outputMaterializationFlags;
-
-        boolean reqMaterialization = false;
-        int matOutputs = 0;
-        int nonMatOutputs = 0;
-        for (boolean flag : outputMaterializationFlags) {
-            if (flag) {
-                reqMaterialization = true;
-                matOutputs++;
-            } else {
-                nonMatOutputs++;
-            }
-        }
-
-        this.requiresMaterialization = reqMaterialization;
-        this.numberOfMaterializedOutputs = matOutputs;
-        this.numberOfNonMaterializedOutputs = nonMatOutputs;
-
-    }
-
-    @Override
-    public void contributeActivities(IActivityGraphBuilder builder) {
-        SplitterMaterializerActivityNode sma =
-                new SplitterMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
-        builder.addActivity(this, sma);
-        builder.addSourceEdge(0, sma, 0);
-        int pipelineOutputIndex = 0;
-        int activityId = MATERIALIZE_READER_ACTIVITY_ID;
-        for (int i = 0; i < outputArity; i++) {
-            if (outputMaterializationFlags[i]) {
-                MaterializeReaderActivityNode mra =
-                        new MaterializeReaderActivityNode(new ActivityId(odId, activityId++));
-                builder.addActivity(this, mra);
-                builder.addBlockingEdge(sma, mra);
-                builder.addTargetEdge(i, mra, 0);
-            } else {
-                builder.addTargetEdge(i, sma, pipelineOutputIndex++);
-            }
-        }
-    }
-
-    private final class SplitterMaterializerActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public SplitterMaterializerActivityNode(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions) {
-            return new AbstractUnaryInputOperatorNodePushable() {
-                private MaterializerTaskState state;
-                private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
-                private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
-
-                @Override
-                public void open() throws HyracksDataException {
-                    if (requiresMaterialization) {
-                        state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
-                                new TaskId(getActivityId(), partition), numberOfMaterializedOutputs);
-                        state.open(ctx);
-                    }
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        isOpen[i] = true;
-                        writers[i].open();
-                    }
-                }
-
-                @Override
-                public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
-                    if (requiresMaterialization) {
-                        state.appendFrame(bufferAccessor);
-                        bufferAccessor.clear();
-                    }
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        FrameUtils.flushFrame(bufferAccessor, writers[i]);
-                    }
-                }
-
-                @Override
-                public void flush() throws HyracksDataException {
-                    if (!requiresMaterialization) {
-                        for (IFrameWriter writer : writers) {
-                            writer.flush();
-                        }
-                    }
-                }
-
-                @Override
-                public void close() throws HyracksDataException {
-                    HyracksDataException hde = null;
-                    try {
-                        if (requiresMaterialization) {
-                            state.close();
-                            ctx.setStateObject(state);
-                        }
-                    } finally {
-                        for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                            if (isOpen[i]) {
-                                try {
-                                    writers[i].close();
-                                } catch (Throwable th) {
-                                    if (hde == null) {
-                                        hde = new HyracksDataException(th);
-                                    } else {
-                                        hde.addSuppressed(th);
-                                    }
-                                }
-                            }
-                        }
-                    }
-                    if (hde != null) {
-                        throw hde;
-                    }
-                }
-
-                @Override
-                public void fail() throws HyracksDataException {
-                    HyracksDataException hde = null;
-                    for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
-                        if (isOpen[i]) {
-                            try {
-                                writers[i].fail();
-                            } catch (Throwable th) {
-                                if (hde == null) {
-                                    hde = new HyracksDataException(th);
-                                } else {
-                                    hde.addSuppressed(th);
-                                }
-                            }
-                        }
-                    }
-                    if (hde != null) {
-                        throw hde;
-                    }
-                }
-
-                @Override
-                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
-                    writers[index] = writer;
-                }
-            };
-        }
-    }
-
-    private final class MaterializeReaderActivityNode extends AbstractActivityNode {
-        private static final long serialVersionUID = 1L;
-
-        public MaterializeReaderActivityNode(ActivityId id) {
-            super(id);
-        }
-
-        @Override
-        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
-                final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
-                throws HyracksDataException {
-            return new AbstractUnaryOutputSourceOperatorNodePushable() {
-
-                @Override
-                public void initialize() throws HyracksDataException {
-                    MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
-                            new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
-                    state.writeOut(writer, new VSizeFrame(ctx));
-                }
-
-            };
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
new file mode 100644
index 0000000..9a56cd3
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/ReplicateOperatorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.tests.integration;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+
+import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
+import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
+import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
+import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
+import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
+import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
+import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ReplicateOperatorTest extends AbstractIntegrationTest {
+
+    public void compareFiles(String fileNameA, String fileNameB) throws IOException {
+        BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
+        BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
+
+        String lineA, lineB;
+        while ((lineA = fileA.readLine()) != null) {
+            lineB = fileB.readLine();
+            Assert.assertEquals(lineA, lineB);
+        }
+        Assert.assertNull(fileB.readLine());
+        fileA.close();
+        fileB.close();
+    }
+
+    @Test
+    public void test() throws Exception {
+        final int outputArity = 2;
+
+        JobSpecification spec = new JobSpecification();
+
+        String inputFileName = "data/words.txt";
+        File[] outputFile = new File[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFile[i] = File.createTempFile("replicateop", null);
+            outputFile[i].deleteOnExit();
+        }
+
+        FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) };
+
+        String[] locations = new String[] { NC1_ID };
+
+        DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
+                new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
+        RecordDescriptor stringRec = new RecordDescriptor(
+                new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), });
+
+        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
+                inputSplits), stringParser, stringRec);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
+
+        ReplicateOperatorDescriptor replicateOp = new ReplicateOperatorDescriptor(spec, stringRec, outputArity);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, replicateOp, locations);
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            ResultSetId rsId = new ResultSetId(i);
+            spec.addResultSetId(rsId);
+
+            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
+                    ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, replicateOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), replicateOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        String[] expectedResultsFileNames = new String[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            expectedResultsFileNames[i] = inputFileName;
+        }
+        runTestAndCompareResults(spec, expectedResultsFileNames);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
deleted file mode 100644
index 40b4251..0000000
--- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/SplitOperatorTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.tests.integration;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
-import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-import org.apache.hyracks.api.dataset.ResultSetId;
-import org.apache.hyracks.api.job.JobSpecification;
-import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
-import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
-import org.apache.hyracks.dataflow.common.data.parsers.UTF8StringParserFactory;
-import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
-import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
-import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
-import org.apache.hyracks.dataflow.std.misc.SplitOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
-import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
-
-public class SplitOperatorTest extends AbstractIntegrationTest {
-
-    public void compareFiles(String fileNameA, String fileNameB) throws IOException {
-        BufferedReader fileA = new BufferedReader(new FileReader(fileNameA));
-        BufferedReader fileB = new BufferedReader(new FileReader(fileNameB));
-
-        String lineA, lineB;
-        while ((lineA = fileA.readLine()) != null) {
-            lineB = fileB.readLine();
-            Assert.assertEquals(lineA, lineB);
-        }
-        Assert.assertNull(fileB.readLine());
-        fileA.close();
-        fileB.close();
-    }
-
-    @Test
-    public void test() throws Exception {
-        final int outputArity = 2;
-
-        JobSpecification spec = new JobSpecification();
-
-        String inputFileName = "data/words.txt";
-        File[] outputFile = new File[outputArity];
-        for (int i = 0; i < outputArity; i++) {
-            outputFile[i] = File.createTempFile("splitop", null);
-            outputFile[i].deleteOnExit();
-        }
-
-        FileSplit[] inputSplits = new FileSplit[] { new FileSplit(NC1_ID, inputFileName) };
-
-        String[] locations = new String[] { NC1_ID };
-
-        DelimitedDataTupleParserFactory stringParser = new DelimitedDataTupleParserFactory(
-                new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE }, '\u0000');
-        RecordDescriptor stringRec = new RecordDescriptor(
-                new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), });
-
-        FileScanOperatorDescriptor scanOp = new FileScanOperatorDescriptor(spec, new ConstantFileSplitProvider(
-                inputSplits), stringParser, stringRec);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, scanOp, locations);
-
-        SplitOperatorDescriptor splitOp = new SplitOperatorDescriptor(spec, stringRec, outputArity);
-        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, splitOp, locations);
-
-        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
-        for (int i = 0; i < outputArity; i++) {
-            ResultSetId rsId = new ResultSetId(i);
-            spec.addResultSetId(rsId);
-
-            outputOp[i] = new ResultWriterOperatorDescriptor(spec, rsId, true, false,
-                    ResultSerializerFactoryProvider.INSTANCE.getResultSerializerFactoryProvider());
-            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i], locations);
-        }
-
-        spec.connect(new OneToOneConnectorDescriptor(spec), scanOp, 0, splitOp, 0);
-        for (int i = 0; i < outputArity; i++) {
-            spec.connect(new OneToOneConnectorDescriptor(spec), splitOp, i, outputOp[i], 0);
-        }
-
-        for (int i = 0; i < outputArity; i++) {
-            spec.addRoot(outputOp[i]);
-        }
-        String[] expectedResultsFileNames = new String[outputArity];
-        for (int i = 0; i < outputArity; i++) {
-            expectedResultsFileNames[i] = inputFileName;
-        }
-        runTestAndCompareResults(spec, expectedResultsFileNames);
-    }
-}


[2/2] asterixdb git commit: Index-only plan step 2: Added SplitOperator

Posted by wa...@apache.org.
Index-only plan step 2: Added SplitOperator

 - Introduced SplitOperator that sends a tuple to only one output frame unlike the ReplicateOperator
   that propagates a tuple into all outputs frames.
 - Removed PartitioningSplitOperator and PartitioningSplitOperatorDescriptor that are not functional
   (lacking physical operator)
 - Added a unit test case of SplitOperatorDescriptor in PushRuntimeTest.

Change-Id: Ice190827513cd8632764b52c9d0338d65c830740
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1196
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Yingyi Bu <bu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/76a4f9e3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/76a4f9e3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/76a4f9e3

Branch: refs/heads/master
Commit: 76a4f9e36e841d9325bedb3bb96e7e68c1d3f065
Parents: db1c115
Author: Taewoo Kim <wa...@yahoo.com>
Authored: Tue Oct 11 14:16:35 2016 -0700
Committer: Taewoo Kim <wa...@yahoo.com>
Committed: Tue Oct 11 17:08:54 2016 -0700

----------------------------------------------------------------------
 .../SweepIllegalNonfunctionalFunctions.java     |  11 +-
 .../subplan/InlineAllNtsInSubplanVisitor.java   |  11 +-
 ...neLeftNtsInSubplanJoinFlatteningVisitor.java |   7 +-
 .../SubplanSpecialFlatteningCheckVisitor.java   |  10 +-
 .../core/algebra/base/LogicalOperatorTag.java   |   2 +-
 .../core/algebra/base/PhysicalOperatorTag.java  |   4 +-
 .../logical/AbstractReplicateOperator.java      | 103 ++++++++
 .../logical/PartitioningSplitOperator.java      | 119 ----------
 .../operators/logical/ReplicateOperator.java    |  70 +-----
 .../operators/logical/SplitOperator.java        |  65 ++++++
 .../visitors/CardinalityInferenceVisitor.java   |   6 +-
 .../visitors/FDsAndEquivClassesVisitor.java     |  15 +-
 .../visitors/IsomorphismOperatorVisitor.java    |  19 +-
 .../IsomorphismVariableMappingVisitor.java      |   9 +-
 ...OperatorDeepCopyWithNewVariablesVisitor.java |  19 +-
 .../visitors/LogicalPropertiesVisitor.java      |  14 +-
 .../visitors/OperatorDeepCopyVisitor.java       |  15 +-
 .../visitors/PrimaryKeyVariablesVisitor.java    |   7 +-
 .../visitors/ProducedVariableVisitor.java       |  14 +-
 .../logical/visitors/SchemaVariableVisitor.java |  16 +-
 .../visitors/SubstituteVariableVisitor.java     |  21 +-
 .../logical/visitors/UsedVariableVisitor.java   |  18 +-
 .../physical/AbstractReplicatePOperator.java    |  68 ++++++
 .../operators/physical/ReplicatePOperator.java  |  47 +---
 .../operators/physical/SplitPOperator.java      |  68 ++++++
 .../LogicalOperatorPrettyPrintVisitor.java      |  20 +-
 .../visitors/ILogicalOperatorVisitor.java       |   8 +-
 ...placeNtsWithSubplanInputOperatorVisitor.java |   7 +-
 .../PartitioningSplitOperatorDescriptor.java    | 217 -----------------
 .../operators/std/SplitOperatorDescriptor.java  | 188 +++++++++++++++
 .../data/simple/int-string-part1-split-0.tbl    |   4 +
 .../data/simple/int-string-part1-split-1.tbl    |   3 +
 .../data/simple/int-string-part1.tbl            |   7 +
 .../tests/pushruntime/PushRuntimeTest.java      |  73 +++++-
 .../AbstractReplicateOperatorDescriptor.java    | 233 +++++++++++++++++++
 .../std/misc/ReplicateOperatorDescriptor.java   |  36 +++
 .../std/misc/SplitOperatorDescriptor.java       | 231 ------------------
 .../integration/ReplicateOperatorTest.java      | 115 +++++++++
 .../tests/integration/SplitOperatorTest.java    | 116 ---------
 39 files changed, 1083 insertions(+), 933 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index e04be6f..4a79387 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -44,19 +44,19 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -197,15 +197,12 @@ public class SweepIllegalNonfunctionalFunctions extends AbstractExtractExprRule
         }
 
         @Override
-        public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
-            for (Mutable<ILogicalExpression> expr : op.getExpressions()) {
-                sweepExpression(expr.getValue(), op);
-            }
+        public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
             return null;
         }
 
         @Override
-        public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 143cf0b..874cc7c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -26,8 +26,8 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.om.base.AString;
@@ -67,18 +67,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.LogicalOperatorDeepCopyWithNewVariablesVisitor;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
@@ -508,13 +508,12 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
     }
 
     @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
-            throws AlgebricksException {
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 
     @Override
-    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index c7b927e..eeb2c2a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -53,12 +53,12 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeO
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -276,13 +276,12 @@ class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisit
     }
 
     @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
-            throws AlgebricksException {
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 
     @Override
-    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index ef0f9da..ccf0aeb 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -33,17 +33,17 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOpe
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -153,12 +153,12 @@ class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Bool
     }
 
     @Override
-    public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
-        return false;
+    public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
     }
 
     @Override
-    public Boolean visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+    public Boolean visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
         return visitInputs(op);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index d39a8c8..cc7a75f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -38,13 +38,13 @@ public enum LogicalOperatorTag {
     MATERIALIZE,
     NESTEDTUPLESOURCE,
     ORDER,
-    PARTITIONINGSPLIT,
     PROJECT,
     REPLICATE,
     RUNNINGAGGREGATE,
     SCRIPT,
     SELECT,
     SINK,
+    SPLIT,
     SUBPLAN,
     TOKENIZE,
     UNIONALL,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 893fb32..1d20e08 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -46,13 +46,13 @@ public enum PhysicalOperatorTag {
     NESTED_LOOP,
     NESTED_TUPLE_SOURCE,
     ONE_TO_ONE_EXCHANGE,
-    PARTITIONINGSPLIT,
     PRE_CLUSTERED_GROUP_BY,
     PRE_SORTED_DISTINCT_BY,
     RANDOM_PARTITION_EXCHANGE,
     RANDOM_MERGE_EXCHANGE,
     RANGE_PARTITION_EXCHANGE,
     RANGE_PARTITION_MERGE_EXCHANGE,
+    REPLICATE,
     RTREE_SEARCH,
     RUNNING_AGGREGATE,
     SINGLE_PARTITION_INVERTED_INDEX_SEARCH,
@@ -60,7 +60,7 @@ public enum PhysicalOperatorTag {
     SINK_WRITE,
     SORT_GROUP_BY,
     SORT_MERGE_EXCHANGE,
-    REPLICATE,
+    SPLIT,
     STABLE_SORT,
     STATS,
     STREAM_LIMIT,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
new file mode 100644
index 0000000..f883687
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/AbstractReplicateOperator.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
+import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+/**
+ * Abstract class for two replication related operator - replicate and split
+ * Replicate operator propagates all frames to all output branches.
+ * That is, each tuple will be propagated to all output branches.
+ * Split operator propagates each tuple in a frame to one output branch only.
+ */
+public abstract class AbstractReplicateOperator extends AbstractLogicalOperator {
+
+    private int outputArity;
+    protected boolean[] outputMaterializationFlags;
+    private List<Mutable<ILogicalOperator>> outputs;
+
+    public AbstractReplicateOperator(int outputArity) {
+        this.outputArity = outputArity;
+        this.outputMaterializationFlags = new boolean[outputArity];
+        this.outputs = new ArrayList<>();
+    }
+
+    public AbstractReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) {
+        this.outputArity = outputArity;
+        this.outputMaterializationFlags = outputMaterializationFlags;
+        this.outputs = new ArrayList<>();
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
+            throws AlgebricksException {
+        return false;
+    }
+
+    @Override
+    public VariablePropagationPolicy getVariablePropagationPolicy() {
+        return VariablePropagationPolicy.ALL;
+    }
+
+    @Override
+    public boolean isMap() {
+        return true;
+    }
+
+    @Override
+    public void recomputeSchema() {
+        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
+    }
+
+    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
+        // do nothing
+    }
+
+    public int getOutputArity() {
+        return outputArity;
+    }
+
+    public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) {
+        this.outputMaterializationFlags = outputMaterializationFlags;
+    }
+
+    public boolean[] getOutputMaterializationFlags() {
+        return outputMaterializationFlags;
+    }
+
+    public List<Mutable<ILogicalOperator>> getOutputs() {
+        return outputs;
+    }
+
+    @Override
+    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
+        return createPropagatingAllInputsTypeEnvironment(ctx);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java
deleted file mode 100644
index 1c5f324..0000000
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/PartitioningSplitOperator.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.operators.logical;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
-import org.apache.hyracks.algebricks.runtime.operators.std.PartitioningSplitOperatorDescriptor;
-
-/**
- * Partitions it's input based on a given list of expressions.
- * Each expression is assumed to return true/false,
- * and there is exactly one output branch per expression (optionally, plus one default branch).
- * For each input tuple, the expressions are evaluated one-by-one,
- * and the tuple is written to first output branch whose corresponding
- * expression evaluates to true.
- * If all expressions evaluate to false, then
- * the tuple is written to the default output branch, if any was given.
- * If no output branch was given, then such tuples are simply dropped.
- * Given N expressions there may be N or N+1 output branches because the default output branch may be separate from the regular output branches.
- */
-public class PartitioningSplitOperator extends AbstractLogicalOperator {
-
-    private final List<Mutable<ILogicalExpression>> expressions;
-    private final int defaultBranchIndex;
-
-    public PartitioningSplitOperator(List<Mutable<ILogicalExpression>> expressions, int defaultBranchIndex) throws AlgebricksException {
-        this.expressions = expressions;
-        this.defaultBranchIndex = defaultBranchIndex;
-        // Check that the default output branch index is in [0, N], where N is the number of expressions.
-        if (defaultBranchIndex != PartitioningSplitOperatorDescriptor.NO_DEFAULT_BRANCH
-                && defaultBranchIndex > expressions.size()) {
-            throw new AlgebricksException("Default branch index out of bounds. Number of exprs given: "
-                    + expressions.size() + ". The maximum default branch index may therefore be: " + expressions.size());
-        }
-    }
-
-    public List<Mutable<ILogicalExpression>> getExpressions() {
-        return expressions;
-    }
-
-    public int getDefaultBranchIndex() {
-        return defaultBranchIndex;
-    }
-
-    public int getNumOutputBranches() {
-        return (defaultBranchIndex == expressions.size()) ? expressions.size() + 1 : expressions.size();
-    }
-
-    @Override
-    public LogicalOperatorTag getOperatorTag() {
-        return LogicalOperatorTag.PARTITIONINGSPLIT;
-    }
-
-    @Override
-    public void recomputeSchema() {
-        schema = new ArrayList<LogicalVariable>();
-        schema.addAll(inputs.get(0).getValue().getSchema());
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
-        boolean b = false;
-        for (int i = 0; i < expressions.size(); i++) {
-            if (visitor.transform(expressions.get(i))) {
-                b = true;
-            }
-        }
-        return b;
-    }
-
-    @Override
-    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
-        return visitor.visitPartitioningSplitOperator(this, arg);
-    }
-
-    @Override
-    public boolean isMap() {
-        return false;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
index 834107c..2d2fd0f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/ReplicateOperator.java
@@ -18,36 +18,18 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.operators.logical;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
-import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import org.apache.hyracks.algebricks.core.algebra.properties.VariablePropagationPolicy;
-import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
-import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
-public class ReplicateOperator extends AbstractLogicalOperator {
-
-    private int outputArity;
-    private boolean[] outputMaterializationFlags;
-    private List<Mutable<ILogicalOperator>> outputs;
+public class ReplicateOperator extends AbstractReplicateOperator {
 
     public ReplicateOperator(int outputArity) {
-        this.outputArity = outputArity;
-        this.outputMaterializationFlags = new boolean[outputArity];
-        this.outputs = new ArrayList<>();
+        super(outputArity);
     }
 
     public ReplicateOperator(int outputArity, boolean[] outputMaterializationFlags) {
-        this.outputArity = outputArity;
-        this.outputMaterializationFlags = outputMaterializationFlags;
-        this.outputs = new ArrayList<>();
+        super(outputArity, outputMaterializationFlags);
     }
 
     @Override
@@ -60,52 +42,6 @@ public class ReplicateOperator extends AbstractLogicalOperator {
         return visitor.visitReplicateOperator(this, arg);
     }
 
-    @Override
-    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform transform)
-            throws AlgebricksException {
-        return false;
-    }
-
-    @Override
-    public VariablePropagationPolicy getVariablePropagationPolicy() {
-        return VariablePropagationPolicy.ALL;
-    }
-
-    @Override
-    public boolean isMap() {
-        return true;
-    }
-
-    @Override
-    public void recomputeSchema() {
-        schema = new ArrayList<LogicalVariable>(inputs.get(0).getValue().getSchema());
-    }
-
-    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
-        // do nothing
-    }
-
-    public int getOutputArity() {
-        return outputArity;
-    }
-
-    public void setOutputMaterializationFlags(boolean[] outputMaterializationFlags) {
-        this.outputMaterializationFlags = outputMaterializationFlags;
-    }
-
-    public boolean[] getOutputMaterializationFlags() {
-        return outputMaterializationFlags;
-    }
-
-    public List<Mutable<ILogicalOperator>> getOutputs() {
-        return outputs;
-    }
-
-    @Override
-    public IVariableTypeEnvironment computeOutputTypeEnvironment(ITypingContext ctx) throws AlgebricksException {
-        return createPropagatingAllInputsTypeEnvironment(ctx);
-    }
-
     public boolean isBlocker() {
         for (boolean requiresMaterialization : outputMaterializationFlags) {
             if (requiresMaterialization) {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
new file mode 100644
index 0000000..a996673
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SplitOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Split Operator receives an expression. This expression will be evaluated to an integer value during the runtime.
+ * Based on its value, it propagates each tuple to the corresponding output frame. (e.g., first output = 0, ...)
+ * Thus, unlike Replicate operator that does unconditional propagation to all outputs,
+ * this does a conditional propagate operation.
+ */
+public class SplitOperator extends AbstractReplicateOperator {
+
+    // Expression that keeps the output branch information for each tuple
+    private final Mutable<ILogicalExpression> branchingExpression;
+
+    public SplitOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression) {
+        super(outputArity);
+        this.branchingExpression = branchingExpression;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.SPLIT;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitSplitOperator(this, arg);
+    }
+
+    public Mutable<ILogicalExpression> getBranchingExpression() {
+        return branchingExpression;
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        return visitor.transform(branchingExpression);
+    }
+
+    @Override
+    public void substituteVar(LogicalVariable v1, LogicalVariable v2) {
+        getBranchingExpression().getValue().substituteVar(v1, v2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index b999493..d278078 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -48,13 +48,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -175,12 +175,12 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
     }
 
     @Override
-    public Long visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
+    public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
         return op.getInputs().get(0).getValue().accept(this, arg);
     }
 
     @Override
-    public Long visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+    public Long visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
         return op.getInputs().get(0).getValue().accept(this, arg);
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index 5e2a041..b259869 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -30,7 +30,6 @@ import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.ListSet;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.EquivalenceClass;
@@ -67,13 +66,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -405,12 +404,6 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx)
-            throws AlgebricksException {
-        throw new NotImplementedException();
-    }
-
-    @Override
     public Void visitProjectOperator(ProjectOperator op, IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClassesForUsedVars(op, ctx, op.getVariables());
         return null;
@@ -423,6 +416,12 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index c0e8b34..7f34e8b 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -55,13 +55,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeO
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -277,24 +277,23 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
     }
 
     @Override
-    public Boolean visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
-            throws AlgebricksException {
+    public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.PARTITIONINGSPLIT) {
+        if (aop.getOperatorTag() != LogicalOperatorTag.REPLICATE) {
             return Boolean.FALSE;
         }
-        PartitioningSplitOperator partitionOpArg = (PartitioningSplitOperator) copyAndSubstituteVar(op, arg);
-        boolean isomorphic = compareExpressions(op.getExpressions(), partitionOpArg.getExpressions());
-        return isomorphic;
+        return Boolean.TRUE;
     }
 
     @Override
-    public Boolean visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Boolean visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
-        if (aop.getOperatorTag() != LogicalOperatorTag.REPLICATE) {
+        if (aop.getOperatorTag() != LogicalOperatorTag.SPLIT) {
             return Boolean.FALSE;
         }
-        return Boolean.TRUE;
+        SplitOperator sOpArg = (SplitOperator) copyAndSubstituteVar(op, arg);
+        boolean isomorphic = op.getBranchingExpression().getValue().equals(sOpArg.getBranchingExpression().getValue());
+        return isomorphic;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 965008c..58b31f8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -22,8 +22,8 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
@@ -57,13 +57,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -165,14 +165,13 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
-            throws AlgebricksException {
+    public Void visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
     }
 
     @Override
-    public Void visitReplicateOperator(ReplicateOperator op, ILogicalOperator arg) throws AlgebricksException {
+    public Void visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index 4241d84..f4b3195 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -52,18 +52,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl;
 import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
@@ -384,14 +384,6 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     }
 
     @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, ILogicalOperator arg)
-            throws AlgebricksException {
-        PartitioningSplitOperator opCopy = new PartitioningSplitOperator(
-                exprDeepCopyVisitor.deepCopyExpressionReferenceList(op.getExpressions()), op.getDefaultBranchIndex());
-        return opCopy;
-    }
-
-    @Override
     public ILogicalOperator visitProjectOperator(ProjectOperator op, ILogicalOperator arg) throws AlgebricksException {
         ProjectOperator opCopy = new ProjectOperator(deepCopyVariableList(op.getVariables()));
         deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
@@ -410,6 +402,13 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
     }
 
     @Override
+    public ILogicalOperator visitSplitOperator(SplitOperator op, ILogicalOperator arg) throws AlgebricksException {
+        SplitOperator opCopy = new SplitOperator(op.getOutputArity(), op.getBranchingExpression());
+        deepCopyInputsAnnotationsAndExecutionMode(op, arg, opCopy);
+        return opCopy;
+    }
+
+    @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg)
             throws AlgebricksException {
         MaterializeOperator opCopy = new MaterializeOperator();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 8d3644d..7e92869 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -47,13 +47,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -174,13 +174,6 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext arg)
-            throws AlgebricksException {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
     public Void visitProjectOperator(ProjectOperator op, IOptimizationContext context) throws AlgebricksException {
         propagateCardinalityAndFrameNumber(op, context);
         return null;
@@ -193,6 +186,11 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index fde6a28..442899f 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -52,13 +52,14 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -66,7 +67,6 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOpe
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
 import org.apache.hyracks.algebricks.core.algebra.util.OperatorManipulationUtil;
 import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -167,16 +167,13 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
     }
 
     @Override
-    public ILogicalOperator visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg)
-            throws AlgebricksException {
-        ArrayList<Mutable<ILogicalExpression>> newExpressions = new ArrayList<>();
-        deepCopyExpressionRefs(newExpressions, op.getExpressions());
-        return new PartitioningSplitOperator(newExpressions, op.getDefaultBranchIndex());
+    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
+        return new ReplicateOperator(op.getOutputArity());
     }
 
     @Override
-    public ILogicalOperator visitReplicateOperator(ReplicateOperator op, Void arg) throws AlgebricksException {
-        return new ReplicateOperator(op.getOutputArity());
+    public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        return new SplitOperator(op.getOutputArity(), op.getBranchingExpression());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index f01b20f..9f1acea 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -49,13 +49,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperato
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -174,13 +174,12 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, IOptimizationContext ctx)
-            throws AlgebricksException {
+    public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException {
         return null;
     }
 
     @Override
-    public Void visitReplicateOperator(ReplicateOperator op, IOptimizationContext ctx) throws AlgebricksException {
+    public Void visitSplitOperator(SplitOperator op, IOptimizationContext arg) throws AlgebricksException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 10659b1..3645aff 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -48,18 +48,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -156,11 +156,6 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
-        return null;
-    }
-
-    @Override
     public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
         return null;
     }
@@ -255,6 +250,11 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index d35153a..a746cf2 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -46,18 +46,18 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -179,12 +179,6 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) throws AlgebricksException {
-        standardLayout(op);
-        return null;
-    }
-
-    @Override
     public Void visitProjectOperator(ProjectOperator op, Void arg) throws AlgebricksException {
         schemaVariables.addAll(op.getVariables());
         return null;
@@ -288,6 +282,12 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index f2da7c4..7345928 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -46,19 +46,19 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDelete
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -219,16 +219,6 @@ public class SubstituteVariableVisitor
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op,
-            Pair<LogicalVariable, LogicalVariable> pair) throws AlgebricksException {
-        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
-            e.getValue().substituteVar(pair.first, pair.second);
-        }
-        substVarTypes(op, pair);
-        return null;
-    }
-
-    @Override
     public Void visitProjectOperator(ProjectOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
         List<LogicalVariable> usedVariables = op.getVariables();
@@ -414,6 +404,13 @@ public class SubstituteVariableVisitor
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Pair<LogicalVariable, LogicalVariable> arg)
+            throws AlgebricksException {
+        op.substituteVar(arg.first, arg.second);
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> arg)
             throws AlgebricksException {
         return null;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 0d50dc2..cfe2a37 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -52,13 +52,13 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeO
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator.IOrder;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.PartitioningSplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
@@ -232,14 +232,6 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
-    public Void visitPartitioningSplitOperator(PartitioningSplitOperator op, Void arg) {
-        for (Mutable<ILogicalExpression> e : op.getExpressions()) {
-            e.getValue().getUsedVariables(usedVariables);
-        }
-        return null;
-    }
-
-    @Override
     public Void visitProjectOperator(ProjectOperator op, Void arg) {
         List<LogicalVariable> parameterVariables = op.getVariables();
         for (LogicalVariable v : parameterVariables) {
@@ -443,6 +435,14 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
     }
 
     @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
+        for (Mutable<ILogicalOperator> outputOp : op.getOutputs()) {
+            VariableUtilities.getUsedVariables(outputOp.getValue(), usedVariables);
+        }
+        return null;
+    }
+
+    @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/76a4f9e3/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
new file mode 100644
index 0000000..1d13163
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/AbstractReplicatePOperator.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractReplicateOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements;
+import org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+
+public abstract class AbstractReplicatePOperator extends AbstractPhysicalOperator {
+
+    @Override
+    public boolean isMicroOperator() {
+        return false;
+    }
+
+    @Override
+    public PhysicalRequirements getRequiredPropertiesForChildren(ILogicalOperator op,
+            IPhysicalPropertiesVector reqdByParent, IOptimizationContext context) {
+        return emptyUnaryRequirements();
+    }
+
+    @Override
+    public void computeDeliveredProperties(ILogicalOperator op, IOptimizationContext context) {
+        AbstractLogicalOperator op2 = (AbstractLogicalOperator) op.getInputs().get(0).getValue();
+        deliveredProperties = (StructuralPropertiesVector) op2.getDeliveredPhysicalProperties().clone();
+    }
+
+    @Override
+    public Pair<int[], int[]> getInputOutputDependencyLabels(ILogicalOperator op) {
+        int[] inputDependencyLabels = new int[] { 0 };
+        AbstractReplicateOperator rop = (AbstractReplicateOperator) op;
+        int[] outputDependencyLabels = new int[rop.getOutputArity()];
+        // change the labels of outputs that requires materialization to 1
+        boolean[] outputMaterializationFlags = rop.getOutputMaterializationFlags();
+        for (int i = 0; i < rop.getOutputArity(); i++) {
+            if (outputMaterializationFlags[i]) {
+                outputDependencyLabels[i] = 1;
+            }
+        }
+        return new Pair<>(inputDependencyLabels, outputDependencyLabels);
+    }
+
+    @Override
+    public boolean expensiveThanMaterialization() {
+        return false;
+    }
+}