You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by gg...@apache.org on 2023/01/06 18:04:23 UTC

[asterixdb] branch master updated: [NO ISSUE][HYR] Added switch operator

This is an automated email from the ASF dual-hosted git repository.

ggalvizo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new dd26fc456d [NO ISSUE][HYR] Added switch operator
dd26fc456d is described below

commit dd26fc456d525bc394e11b612fc15f016972af26
Author: Sushrut Borkar <su...@gmail.com>
AuthorDate: Tue Nov 29 19:20:11 2022 -0800

    [NO ISSUE][HYR] Added switch operator
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    details:
    
    - The switch operator propagates each input tuple to one
      or more output branches based on an output mapping given
      at compile time.
    
    Change-Id: Iba4f46a9d1ea7b8cadd7cca250d43576ad036438
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17294
    Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Glenn Galvizo <gg...@uci.edu>
---
 .../rules/SweepIllegalNonfunctionalFunctions.java  |   6 +
 .../rules/cbo/EstimatedCostComputationVisitor.java |   6 +
 .../OperatorValueAccessPushdownVisitor.java        |   9 +-
 .../subplan/InlineAllNtsInSubplanVisitor.java      |   6 +
 ...nlineLeftNtsInSubplanJoinFlatteningVisitor.java |   6 +
 .../SubplanSpecialFlatteningCheckVisitor.java      |   6 +
 .../asterix/app/resource/PlanStagesGenerator.java  |   8 +
 .../app/resource/RequiredCapacityVisitor.java      |   8 +
 .../core/algebra/base/LogicalOperatorTag.java      |   1 +
 .../core/algebra/base/PhysicalOperatorTag.java     |   1 +
 .../algebra/operators/logical/SwitchOperator.java  |  67 ++++++
 .../visitors/CardinalityInferenceVisitor.java      |   6 +
 .../visitors/FDsAndEquivClassesVisitor.java        |   7 +
 .../visitors/IsomorphismOperatorVisitor.java       |   8 +
 .../IsomorphismVariableMappingVisitor.java         |   7 +
 ...calOperatorDeepCopyWithNewVariablesVisitor.java |   8 +
 .../logical/visitors/LogicalPropertiesVisitor.java |   6 +
 .../logical/visitors/OperatorDeepCopyVisitor.java  |  10 +-
 .../visitors/PrimaryKeyVariablesVisitor.java       |   6 +
 .../logical/visitors/ProducedVariableVisitor.java  |   6 +
 .../logical/visitors/SchemaVariableVisitor.java    |   7 +
 .../visitors/SubstituteVariableVisitor.java        |   9 +
 .../logical/visitors/UsedVariableVisitor.java      |   8 +
 .../operators/physical/SwitchPOperator.java        |  71 +++++++
 .../LogicalOperatorPrettyPrintVisitor.java         |   8 +
 .../LogicalOperatorPrettyPrintVisitorJson.java     |   8 +
 .../algebra/visitors/ILogicalOperatorVisitor.java  |   3 +
 ...LogicalExpressionReferenceTransformVisitor.java |   7 +
 .../core/utils/LogicalOperatorDotVisitor.java      |   8 +
 .../rules/SetAlgebricksPhysicalOperatorsRule.java  |   7 +
 .../rewriter/rules/SetMemoryRequirementsRule.java  |   6 +
 .../ReplaceNtsWithSubplanInputOperatorVisitor.java |   6 +
 .../operators/std/SwitchOperatorDescriptor.java    | 228 +++++++++++++++++++++
 .../data/simple/int-string-part2-switch-0.tbl      |   6 +
 .../data/simple/int-string-part2-switch-1.tbl      |   7 +
 .../data/simple/int-string-part2-switch-2.tbl      |   7 +
 .../data/device0/data/simple/int-string-part2.tbl  |  10 +
 .../tests/pushruntime/PushRuntimeTest.java         |  72 +++++++
 .../base/AbstractReplicateOperatorDescriptor.java  |   3 +-
 39 files changed, 666 insertions(+), 3 deletions(-)

diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
index 2509010c96..2a6604ecfa 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SweepIllegalNonfunctionalFunctions.java
@@ -61,6 +61,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -204,6 +205,11 @@ public class SweepIllegalNonfunctionalFunctions implements IAlgebraicRewriteRule
             return null;
         }
 
+        @Override
+        public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
         @Override
         public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
             return null;
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EstimatedCostComputationVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EstimatedCostComputationVisitor.java
index f9f34bf0d7..b9e295c430 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EstimatedCostComputationVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/cbo/EstimatedCostComputationVisitor.java
@@ -54,6 +54,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -168,6 +169,11 @@ public class EstimatedCostComputationVisitor implements ILogicalOperatorVisitor<
         return annotate(this, op, arg);
     }
 
+    @Override
+    public Pair<Double, Double> visitSwitchOperator(SwitchOperator op, Double arg) throws AlgebricksException {
+        return annotate(this, op, arg);
+    }
+
     @Override
     public Pair<Double, Double> visitMaterializeOperator(MaterializeOperator op, Double arg)
             throws AlgebricksException {
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
index 6739384ad0..05bc1614e8 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
@@ -74,6 +74,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -361,6 +362,12 @@ public class OperatorValueAccessPushdownVisitor implements ILogicalOperatorVisit
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         visitInputs(op);
@@ -474,4 +481,4 @@ public class OperatorValueAccessPushdownVisitor implements ILogicalOperatorVisit
     private void visitInputs(ILogicalOperator op) throws AlgebricksException {
         visitInputs(op, null);
     }
-}
\ No newline at end of file
+}
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
index 4ac44b400f..55165ede4a 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineAllNtsInSubplanVisitor.java
@@ -80,6 +80,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -569,6 +570,11 @@ class InlineAllNtsInSubplanVisitor implements IQueryOperatorVisitor<ILogicalOper
         return visitSingleInputOperator(op);
     }
 
+    @Override
+    public ILogicalOperator visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
     @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
index d4b7853daa..cd476f9e34 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/InlineLeftNtsInSubplanJoinFlatteningVisitor.java
@@ -63,6 +63,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -296,6 +297,11 @@ class InlineLeftNtsInSubplanJoinFlatteningVisitor implements IQueryOperatorVisit
         return visitSingleInputOperator(op);
     }
 
+    @Override
+    public ILogicalOperator visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return visitSingleInputOperator(op);
+    }
+
     @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return visitSingleInputOperator(op);
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
index a0bb4b6487..8bb7502d0c 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/subplan/SubplanSpecialFlatteningCheckVisitor.java
@@ -48,6 +48,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -166,6 +167,11 @@ class SubplanSpecialFlatteningCheckVisitor implements IQueryOperatorVisitor<Bool
         return visitInputs(op);
     }
 
+    @Override
+    public Boolean visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return visitInputs(op);
+    }
+
     @Override
     public Boolean visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return visitInputs(op);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
index 25e51bb545..5d74c13548 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/PlanStagesGenerator.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
@@ -59,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -196,6 +198,12 @@ public class PlanStagesGenerator implements ILogicalOperatorVisitor<Void, Void>
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         visit(op);
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
index 024a13e8e1..af383c36de 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/resource/RequiredCapacityVisitor.java
@@ -24,6 +24,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
@@ -57,6 +58,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -192,6 +194,12 @@ public class RequiredCapacityVisitor implements ILogicalOperatorVisitor<Void, Vo
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         visitInternal(op, true);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
index 4466408a2d..9c41cb9ea5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/LogicalOperatorTag.java
@@ -46,6 +46,7 @@ public enum LogicalOperatorTag {
     SELECT,
     SINK,
     SPLIT,
+    SWITCH,
     SUBPLAN,
     TOKENIZE,
     UNIONALL,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
index 4a900af22e..c052d58c84 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/PhysicalOperatorTag.java
@@ -68,6 +68,7 @@ public enum PhysicalOperatorTag {
     SORT_MERGE_EXCHANGE,
     SPATIAL_JOIN,
     SPLIT,
+    SWITCH,
     STABLE_SORT,
     STATS,
     STREAM_LIMIT,
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SwitchOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SwitchOperator.java
new file mode 100644
index 0000000000..4b1f260345
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/SwitchOperator.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.logical;
+
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+import org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * Switch Operator receives an expression and an output mapping. We evaluate the expression during runtime and look up
+ * the result in the output mapping. Based on this, we propagate each tuple to the corresponding output branch(es).
+ */
+public class SwitchOperator extends AbstractReplicateOperator {
+
+    // Expression containing the index of the relevant field
+    private final Mutable<ILogicalExpression> branchingExpression;
+
+    // The supplied mapping from field values to arrays of output branch numbers
+    private final Map<Integer, int[]> outputMapping;
+
+    public SwitchOperator(int outputArity, Mutable<ILogicalExpression> branchingExpression,
+            Map<Integer, int[]> outputMapping) {
+        super(outputArity);
+        this.branchingExpression = branchingExpression;
+        this.outputMapping = outputMapping;
+    }
+
+    @Override
+    public LogicalOperatorTag getOperatorTag() {
+        return LogicalOperatorTag.SWITCH;
+    }
+
+    @Override
+    public <R, T> R accept(ILogicalOperatorVisitor<R, T> visitor, T arg) throws AlgebricksException {
+        return visitor.visitSwitchOperator(this, arg);
+    }
+
+    public Mutable<ILogicalExpression> getBranchingExpression() {
+        return branchingExpression;
+    }
+
+    public Map<Integer, int[]> getOutputMapping() {
+        return outputMapping;
+    }
+
+    @Override
+    public boolean acceptExpressionTransform(ILogicalExpressionReferenceTransform visitor) throws AlgebricksException {
+        return visitor.transform(branchingExpression);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
index 6ed90a5930..14221d63e3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/CardinalityInferenceVisitor.java
@@ -58,6 +58,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -195,6 +196,11 @@ public class CardinalityInferenceVisitor implements ILogicalOperatorVisitor<Long
         return op.getInputs().get(0).getValue().accept(this, arg);
     }
 
+    @Override
+    public Long visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return op.getInputs().get(0).getValue().accept(this, arg);
+    }
+
     @Override
     public Long visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return op.getInputs().get(0).getValue().accept(this, arg);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
index f3717c82be..5937d9d651 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/FDsAndEquivClassesVisitor.java
@@ -74,6 +74,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -426,6 +427,12 @@ public class FDsAndEquivClassesVisitor implements ILogicalOperatorVisitor<Void,
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, IOptimizationContext ctx) throws AlgebricksException {
+        propagateFDsAndEquivClasses(op, ctx);
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
         propagateFDsAndEquivClasses(op, ctx);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
index e4df39724d..ccd152f46e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismOperatorVisitor.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -65,6 +66,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -308,6 +310,12 @@ public class IsomorphismOperatorVisitor implements ILogicalOperatorVisitor<Boole
         return isomorphic;
     }
 
+    @Override
+    public Boolean visitSwitchOperator(SwitchOperator op, ILogicalOperator arg) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
         AbstractLogicalOperator aop = (AbstractLogicalOperator) arg;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
index 8ca2b83165..e4d6586424 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/IsomorphismVariableMappingVisitor.java
@@ -65,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -204,6 +205,12 @@ public class IsomorphismVariableMappingVisitor implements ILogicalOperatorVisito
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, ILogicalOperator arg) throws AlgebricksException {
+        mapVariablesStandard(op, arg);
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg) throws AlgebricksException {
         mapVariablesStandard(op, arg);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
index e242531782..08acf13d5a 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalOperatorDeepCopyWithNewVariablesVisitor.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -64,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -446,6 +448,12 @@ public class LogicalOperatorDeepCopyWithNewVariablesVisitor
         return opCopy;
     }
 
+    @Override
+    public ILogicalOperator visitSwitchOperator(SwitchOperator op, ILogicalOperator arg) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, ILogicalOperator arg)
             throws AlgebricksException {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
index 5a566eeeb7..3a88d2c430 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/LogicalPropertiesVisitor.java
@@ -57,6 +57,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -193,6 +194,11 @@ public class LogicalPropertiesVisitor implements ILogicalOperatorVisitor<Void, I
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext arg) throws AlgebricksException {
         // TODO Auto-generated method stub
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
index c2ee661fd3..8ef0b5b18d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/OperatorDeepCopyVisitor.java
@@ -20,6 +20,7 @@
 package org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
@@ -63,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -176,7 +178,13 @@ public class OperatorDeepCopyVisitor implements ILogicalOperatorVisitor<ILogical
 
     @Override
     public ILogicalOperator visitSplitOperator(SplitOperator op, Void arg) throws AlgebricksException {
-        return new SplitOperator(op.getOutputArity(), op.getBranchingExpression());
+        return new SplitOperator(op.getOutputArity(), deepCopyExpressionRef(op.getBranchingExpression()));
+    }
+
+    @Override
+    public ILogicalOperator visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return new SwitchOperator(op.getOutputArity(), deepCopyExpressionRef(op.getBranchingExpression()),
+                new HashMap<>(op.getOutputMapping()));
     }
 
     @Override
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
index eb9028817e..8a3a88594d 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/PrimaryKeyVariablesVisitor.java
@@ -58,6 +58,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -185,6 +186,11 @@ public class PrimaryKeyVariablesVisitor implements ILogicalOperatorVisitor<Void,
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, IOptimizationContext arg) throws AlgebricksException {
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, IOptimizationContext ctx) throws AlgebricksException {
         return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
index 5d9d7895fa..ff50994d8e 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/ProducedVariableVisitor.java
@@ -62,6 +62,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -270,6 +271,11 @@ public class ProducedVariableVisitor implements ILogicalOperatorVisitor<Void, Vo
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
index 44bb7e2f82..e7d6a92cd3 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SchemaVariableVisitor.java
@@ -60,6 +60,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -289,6 +290,12 @@ public class SchemaVariableVisitor implements ILogicalOperatorVisitor<Void, Void
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        standardLayout(op);
+        return null;
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         standardLayout(op);
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
index 439e49365c..cf8196c4e8 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/SubstituteVariableVisitor.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -63,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -409,6 +411,13 @@ public class SubstituteVariableVisitor
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Pair<LogicalVariable, LogicalVariable> pair)
+            throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Pair<LogicalVariable, LogicalVariable> pair)
             throws AlgebricksException {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
index 174b18476c..4c994b5999 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/UsedVariableVisitor.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -64,6 +65,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -472,6 +474,12 @@ public class UsedVariableVisitor implements ILogicalOperatorVisitor<Void, Void>
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SwitchPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SwitchPOperator.java
new file mode 100644
index 0000000000..4b587883b1
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SwitchPOperator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.operators.physical;
+
+import java.util.Map;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SwitchOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+
+public class SwitchPOperator extends AbstractReplicatePOperator {
+
+    @Override
+    public PhysicalOperatorTag getOperatorTag() {
+        return PhysicalOperatorTag.SWITCH;
+    }
+
+    @Override
+    public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op,
+            IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema)
+            throws AlgebricksException {
+        SwitchOperator sop = (SwitchOperator) op;
+        int outputArity = sop.getOutputArity();
+        Map<Integer, int[]> outputMapping = sop.getOutputMapping();
+
+        IOperatorDescriptorRegistry spec = builder.getJobSpec();
+        RecordDescriptor recDescriptor =
+                JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context);
+
+        IExpressionRuntimeProvider expressionRuntimeProvider = context.getExpressionRuntimeProvider();
+        IScalarEvaluatorFactory branchingExprEvalFactory = expressionRuntimeProvider.createEvaluatorFactory(
+                sop.getBranchingExpression().getValue(), context.getTypeEnvironment(op), inputSchemas, context);
+
+        IBinaryIntegerInspectorFactory intInspectorFactory = context.getBinaryIntegerInspectorFactory();
+
+        SwitchOperatorDescriptor sopDesc = new SwitchOperatorDescriptor(spec, recDescriptor, outputArity,
+                branchingExprEvalFactory, intInspectorFactory, outputMapping);
+        sopDesc.setSourceLocation(sop.getSourceLocation());
+
+        contributeOpDesc(builder, sop, sopDesc);
+        ILogicalOperator src = op.getInputs().get(0).getValue();
+        builder.contributeGraphEdge(src, 0, op, 0);
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
index e3f9eebbc6..8776f2ea03 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitor.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -66,6 +67,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -439,6 +441,12 @@ public class LogicalOperatorPrettyPrintVisitor extends AbstractLogicalOperatorPr
         return null;
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Integer indent) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Integer indent) throws AlgebricksException {
         addIndent(indent).append("materialize");
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
index fa98091a16..51980652f5 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/prettyprint/LogicalOperatorPrettyPrintVisitorJson.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -73,6 +74,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -682,6 +684,12 @@ public class LogicalOperatorPrettyPrintVisitorJson extends AbstractLogicalOperat
         }
     }
 
+    @Override
+    public Void visitSwitchOperator(SwitchOperator op, Void indent) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public Void visitMaterializeOperator(MaterializeOperator op, Void indent) throws AlgebricksException {
         try {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
index d52183142a..4eb6494e94 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/ILogicalOperatorVisitor.java
@@ -48,6 +48,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -88,6 +89,8 @@ public interface ILogicalOperatorVisitor<R, T> {
 
     public R visitSplitOperator(SplitOperator op, T arg) throws AlgebricksException;
 
+    public R visitSwitchOperator(SwitchOperator op, T arg) throws AlgebricksException;
+
     public R visitMaterializeOperator(MaterializeOperator op, T arg) throws AlgebricksException;
 
     public R visitScriptOperator(ScriptOperator op, T arg) throws AlgebricksException;
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
index 9350f954b7..f1613a59d6 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/visitors/LogicalExpressionReferenceTransformVisitor.java
@@ -50,6 +50,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -165,6 +166,12 @@ public abstract class LogicalExpressionReferenceTransformVisitor
         return visitOperator(op, arg);
     }
 
+    @Override
+    public Boolean visitSwitchOperator(SwitchOperator op, ILogicalExpressionReferenceTransform arg)
+            throws AlgebricksException {
+        return visitOperator(op, arg);
+    }
+
     @Override
     public Boolean visitMaterializeOperator(MaterializeOperator op, ILogicalExpressionReferenceTransform arg)
             throws AlgebricksException {
diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
index ee7f7aaaab..75da5ffc14 100644
--- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/utils/LogicalOperatorDotVisitor.java
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.common.utils.Triple;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
@@ -63,6 +64,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -483,6 +485,12 @@ public class LogicalOperatorDotVisitor implements ILogicalOperatorVisitor<String
         return stringBuilder.toString();
     }
 
+    @Override
+    public String visitSwitchOperator(SwitchOperator op, Boolean showDetails) throws AlgebricksException {
+        // TODO (GLENN): Implement this logic
+        throw new NotImplementedException();
+    }
+
     @Override
     public String visitMaterializeOperator(MaterializeOperator op, Boolean showDetails) throws AlgebricksException {
         stringBuilder.setLength(0);
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
index a6fe4951a7..a1aa01a42b 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetAlgebricksPhysicalOperatorsRule.java
@@ -73,6 +73,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -112,6 +113,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamProje
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StreamSelectPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.StringStreamingScriptPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.SubplanPOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.physical.SwitchPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.TokenizePOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnionAllPOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.physical.UnnestPOperator;
@@ -308,6 +310,11 @@ public class SetAlgebricksPhysicalOperatorsRule implements IAlgebraicRewriteRule
             return new SplitPOperator();
         }
 
+        @Override
+        public IPhysicalOperator visitSwitchOperator(SwitchOperator op, Boolean topLevelOp) {
+            return new SwitchPOperator();
+        }
+
         @Override
         public IPhysicalOperator visitScriptOperator(ScriptOperator op, Boolean topLevelOp) {
             return new StringStreamingScriptPOperator();
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
index 3b0e9fbc69..b3748ddf3c 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/SetMemoryRequirementsRule.java
@@ -59,6 +59,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -271,6 +272,11 @@ public class SetMemoryRequirementsRule implements IAlgebraicRewriteRule {
             return null;
         }
 
+        @Override
+        public Void visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+            return null;
+        }
+
         @Override
         public Void visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
             return null;
diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
index 1388ccb28d..f4f9eda525 100644
--- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
+++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/subplan/ReplaceNtsWithSubplanInputOperatorVisitor.java
@@ -56,6 +56,7 @@ import org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperat
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import org.apache.hyracks.algebricks.core.algebra.operators.logical.SwitchOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
 import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
@@ -185,6 +186,11 @@ class ReplaceNtsWithSubplanInputOperatorVisitor implements IQueryOperatorVisitor
         return visit(op);
     }
 
+    @Override
+    public ILogicalOperator visitSwitchOperator(SwitchOperator op, Void arg) throws AlgebricksException {
+        return visit(op);
+    }
+
     @Override
     public ILogicalOperator visitMaterializeOperator(MaterializeOperator op, Void arg) throws AlgebricksException {
         return visit(op);
diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SwitchOperatorDescriptor.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SwitchOperatorDescriptor.java
new file mode 100644
index 0000000000..0fbb1d59ae
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/std/SwitchOperatorDescriptor.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.runtime.operators.std;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
+import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.ActivityId;
+import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.CleanupUtils;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
+import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.std.base.AbstractActivityNode;
+import org.apache.hyracks.dataflow.std.base.AbstractReplicateOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputOperatorNodePushable;
+
+/**
+ * The switch operator propagates each tuple of the input to one or more output branches based on a given output mapping.
+ * For each tuple, we peek at the value of a given field f. We look up the value of the field in the supplied
+ * output mapping and propagate the tuple to all corresponding output branches.
+ */
+public class SwitchOperatorDescriptor extends AbstractReplicateOperatorDescriptor {
+    private static final long serialVersionUID = 1L;
+    final static int SWITCHER_ACTIVITY_ID = 0;
+
+    private final IScalarEvaluatorFactory branchingExprEvalFactory;
+    private final IBinaryIntegerInspectorFactory intInspectorFactory;
+    private final Map<Integer, int[]> outputMapping;
+
+    /**
+     * @param spec
+     * @param rDesc
+     * @param outputArity equal to the number of non-materialized outputs
+     * @param branchingExprEvalFactory containing the index of the relevant field f
+     * @param intInspectorFactory
+     * @param outputMapping the supplied mapping from field values to arrays of output branch numbers
+     */
+    public SwitchOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc, int outputArity,
+            IScalarEvaluatorFactory branchingExprEvalFactory, IBinaryIntegerInspectorFactory intInspectorFactory,
+            Map<Integer, int[]> outputMapping) {
+        super(spec, rDesc, outputArity);
+        if (outputArity != numberOfNonMaterializedOutputs) {
+            throw new IllegalArgumentException();
+        }
+        this.branchingExprEvalFactory = branchingExprEvalFactory;
+        this.intInspectorFactory = intInspectorFactory;
+        this.outputMapping = outputMapping;
+    }
+
+    @Override
+    public void contributeActivities(IActivityGraphBuilder builder) {
+        SwitcherActivityNode sma = new SwitcherActivityNode(new ActivityId(odId, SWITCHER_ACTIVITY_ID));
+        builder.addActivity(this, sma);
+        builder.addSourceEdge(0, sma, 0);
+        for (int i = 0; i < outputArity; i++) {
+            builder.addTargetEdge(i, sma, i);
+        }
+    }
+
+    private final class SwitcherActivityNode extends AbstractActivityNode {
+        private static final long serialVersionUID = 1L;
+
+        public SwitcherActivityNode(ActivityId id) {
+            super(id);
+        }
+
+        @Override
+        public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
+                IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
+                throws HyracksDataException {
+            final IFrameWriter[] writers = new IFrameWriter[outputArity];
+            final boolean[] isOpen = new boolean[outputArity];
+            final IPointable p = VoidPointable.FACTORY.createPointable();
+            // To deal with each tuple in a frame
+            final FrameTupleAccessor accessor = new FrameTupleAccessor(outRecDescs[0]);
+            final FrameTupleAppender[] appenders = new FrameTupleAppender[outputArity];
+            final FrameTupleReference tRef = new FrameTupleReference();
+            final IBinaryIntegerInspector intInspector = intInspectorFactory.createBinaryIntegerInspector(ctx);
+            final IEvaluatorContext evalCtx = new EvaluatorContext(ctx);
+            final IScalarEvaluator eval = branchingExprEvalFactory.createScalarEvaluator(evalCtx);
+            final MutableBoolean hasFailed = new MutableBoolean(false);
+            for (int i = 0; i < outputArity; i++) {
+                appenders[i] = new FrameTupleAppender(new VSizeFrame(ctx), true);
+            }
+
+            return new AbstractUnaryInputOperatorNodePushable() {
+                @Override
+                public void open() throws HyracksDataException {
+                    for (int i = 0; i < outputArity; i++) {
+                        isOpen[i] = true;
+                        writers[i].open();
+                    }
+                }
+
+                @Override
+                public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
+                    // Tuple based access
+                    accessor.reset(bufferAccessor);
+                    int tupleCount = accessor.getTupleCount();
+
+                    for (int i = 0; i < tupleCount; i++) {
+                        // Get the value of the relevant field in the given tuple.
+                        tRef.reset(accessor, i);
+                        eval.evaluate(tRef, p);
+                        int fieldValue =
+                                intInspector.getIntegerValue(p.getByteArray(), p.getStartOffset(), p.getLength());
+
+                        // Look up the corresponding output branches based on the given mapping
+                        int[] outputBranches = outputMapping.get(fieldValue);
+
+                        // Propagate to each output branch
+                        for (int j : outputBranches) {
+                            FrameUtils.appendToWriter(writers[j], appenders[j], accessor, i);
+                        }
+                    }
+                }
+
+                @Override
+                public void close() throws HyracksDataException {
+                    Throwable hde = null;
+                    // write if hasn't failed
+                    if (!hasFailed.booleanValue()) {
+                        for (int i = 0; i < outputArity; i++) {
+                            if (isOpen[i]) {
+                                try {
+                                    appenders[i].write(writers[i], true);
+                                } catch (Throwable th) {
+                                    hde = th;
+                                    break;
+                                }
+                            }
+                        }
+                    }
+
+                    // fail the writers
+                    if (hde != null) {
+                        for (int i = 0; i < outputArity; i++) {
+                            if (isOpen[i]) {
+                                CleanupUtils.fail(writers[i], hde);
+                            }
+                        }
+                    }
+
+                    // close
+                    for (int i = 0; i < outputArity; i++) {
+                        if (isOpen[i]) {
+                            hde = CleanupUtils.close(writers[i], hde);
+                        }
+                    }
+                    if (hde != null) {
+                        throw HyracksDataException.create(hde);
+                    }
+                }
+
+                @Override
+                public void fail() throws HyracksDataException {
+                    hasFailed.setTrue();
+                    HyracksDataException hde = null;
+                    for (int i = 0; i < outputArity; i++) {
+                        if (isOpen[i]) {
+                            try {
+                                writers[i].fail();
+                            } catch (Throwable th) {
+                                if (hde == null) {
+                                    hde = HyracksDataException.create(th);
+                                } else {
+                                    hde.addSuppressed(th);
+                                }
+                            }
+                        }
+                    }
+                    if (hde != null) {
+                        throw hde;
+                    }
+                }
+
+                @Override
+                public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
+                    writers[index] = writer;
+                }
+
+                @Override
+                public void flush() throws HyracksDataException {
+                    for (int i = 0; i < outputArity; i++) {
+                        if (isOpen[i]) {
+                            appenders[i].flush(writers[i]);
+                        }
+                    }
+                }
+            };
+        }
+    }
+}
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-0.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-0.tbl
new file mode 100644
index 0000000000..026bbdb956
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-0.tbl
@@ -0,0 +1,6 @@
+1,second branch1
+2,third branch1
+1,second branch2
+2,third branch2
+1,second branch3
+2,third branch3
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-1.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-1.tbl
new file mode 100644
index 0000000000..464a6448e6
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-1.tbl
@@ -0,0 +1,7 @@
+0,first branch1
+2,third branch1
+0,first branch2
+2,third branch2
+0,first branch3
+2,third branch3
+0,first branch4
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-2.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-2.tbl
new file mode 100644
index 0000000000..b9eabbcac7
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2-switch-2.tbl
@@ -0,0 +1,7 @@
+0,first branch1
+1,second branch1
+0,first branch2
+1,second branch2
+0,first branch3
+1,second branch3
+0,first branch4
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2.tbl b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2.tbl
new file mode 100644
index 0000000000..373cd0fea2
--- /dev/null
+++ b/hyracks-fullstack/algebricks/algebricks-tests/data/device0/data/simple/int-string-part2.tbl
@@ -0,0 +1,10 @@
+0|first branch1
+1|second branch1
+2|third branch1
+0|first branch2
+1|second branch2
+2|third branch2
+0|first branch3
+1|second branch3
+2|third branch3
+0|first branch4
diff --git a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
index aeb22b6871..d23f7f9ccd 100644
--- a/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
+++ b/hyracks-fullstack/algebricks/algebricks-tests/src/test/java/org/apache/hyracks/algebricks/tests/pushruntime/PushRuntimeTest.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.io.FileUtils;
@@ -58,6 +59,7 @@ import org.apache.hyracks.algebricks.runtime.operators.std.StreamLimitRuntimeFac
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamProjectRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StreamSelectRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.operators.std.StringStreamingRuntimeFactory;
+import org.apache.hyracks.algebricks.runtime.operators.std.SwitchOperatorDescriptor;
 import org.apache.hyracks.algebricks.runtime.operators.std.UnnestRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory;
 import org.apache.hyracks.algebricks.tests.util.AlgebricksHyracksIntegrationUtil;
@@ -697,6 +699,76 @@ public class PushRuntimeTest {
         }
     }
 
+    @Test
+    public void scanSwitchWrite() throws Exception {
+        final int outputArity = 3;
+
+        JobSpecification spec = new JobSpecification(FRAME_SIZE);
+
+        String inputFileName[] = { "data" + File.separator + "simple" + File.separator + "int-string-part2.tbl",
+                "data" + File.separator + "simple" + File.separator + "int-string-part2-switch-0.tbl",
+                "data" + File.separator + "simple" + File.separator + "int-string-part2-switch-1.tbl",
+                "data" + File.separator + "simple" + File.separator + "int-string-part2-switch-2.tbl" };
+        File[] inputFiles = new File[inputFileName.length];
+        for (int i = 0; i < inputFileName.length; i++) {
+            inputFiles[i] = new File(inputFileName[i]);
+        }
+        File[] outputFile = new File[outputArity];
+        FileSplit[] outputFileSplit = new FileSplit[outputArity];
+        for (int i = 0; i < outputArity; i++) {
+            outputFileSplit[i] = createFile(AlgebricksHyracksIntegrationUtil.nc1);
+            outputFile[i] = outputFileSplit[i].getFile(AlgebricksHyracksIntegrationUtil.nc1.getIoManager());
+        }
+
+        FileSplit[] inputSplits =
+                new FileSplit[] { new ManagedFileSplit(AlgebricksHyracksIntegrationUtil.NC1_ID, inputFileName[0]) };
+        IFileSplitProvider intSplitProvider = new ConstantFileSplitProvider(inputSplits);
+
+        RecordDescriptor scannerDesc = new RecordDescriptor(new ISerializerDeserializer[] {
+                IntegerSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() });
+
+        IValueParserFactory[] valueParsers =
+                new IValueParserFactory[] { IntegerParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE };
+
+        FileScanOperatorDescriptor intScanner = new FileScanOperatorDescriptor(spec, intSplitProvider,
+                new DelimitedDataTupleParserFactory(valueParsers, '|'), scannerDesc);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, intScanner, DEFAULT_NODES);
+
+        HashMap<Integer, int[]> outputMapping = new HashMap<>();
+        outputMapping.put(0, new int[] { 1, 2 });
+        outputMapping.put(1, new int[] { 0, 2 });
+        outputMapping.put(2, new int[] { 0, 1 });
+
+        SwitchOperatorDescriptor switchOp = new SwitchOperatorDescriptor(spec, scannerDesc, outputArity,
+                new TupleFieldEvaluatorFactory(0), BinaryIntegerInspectorImpl.FACTORY, outputMapping);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, switchOp,
+                new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+
+        IOperatorDescriptor outputOp[] = new IOperatorDescriptor[outputFile.length];
+        for (int i = 0; i < outputArity; i++) {
+            outputOp[i] = new LineFileWriteOperatorDescriptor(spec, new FileSplit[] { outputFileSplit[i] });
+            PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, outputOp[i],
+                    new String[] { AlgebricksHyracksIntegrationUtil.NC1_ID });
+        }
+
+        spec.connect(new OneToOneConnectorDescriptor(spec), intScanner, 0, switchOp, 0);
+        for (int i = 0; i < outputArity; i++) {
+            spec.connect(new OneToOneConnectorDescriptor(spec), switchOp, i, outputOp[i], 0);
+        }
+
+        for (int i = 0; i < outputArity; i++) {
+            spec.addRoot(outputOp[i]);
+        }
+        AlgebricksHyracksIntegrationUtil.runJob(spec);
+
+        for (int i = 0; i < outputArity; i++) {
+            compareFiles("data" + File.separator + "device0" + File.separator + inputFileName[i + 1],
+                    outputFile[i].getAbsolutePath());
+        }
+    }
+
     @Test
     public void scanMicroSortWrite() throws Exception {
         JobSpecification spec = new JobSpecification(FRAME_SIZE);
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
index 4c728cec22..666c25008b 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/base/AbstractReplicateOperatorDescriptor.java
@@ -35,10 +35,11 @@ import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
 import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState;
 
 /**
- * Abstract class for two replication related operator descriptor - replicate and split
+ * Abstract class for three replication related operator descriptors - replicate, split, and switch.
  * Replicate operator propagates all frames to all output branches.
  * That is, each tuple will be propagated to all output branches.
  * Split operator propagates each tuple in a frame to one output branch only.
+ * Switch is a generalization of split that propagates tuples based on a given output mapping.
  */
 public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperatorDescriptor {
     protected static final long serialVersionUID = 1L;