You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/01 00:05:42 UTC

[6/6] git commit: WIP fragmentation, physical plan, byte compiling, some vector work

WIP fragmentation, physical plan, byte compiling, some vector work


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/f0be80dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/f0be80dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/f0be80dc

Branch: refs/heads/execwork
Commit: f0be80dcdaf22e2bb0a428b202c43d03ed063eb6
Parents: 5ede21f
Author: Jacques Nadeau <ja...@apache.org>
Authored: Mon Apr 22 23:04:58 2013 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Apr 30 15:01:38 2013 -0700

----------------------------------------------------------------------
 sandbox/prototype/common/pom.xml                   |    5 +-
 .../org/apache/drill/common/PlanProperties.java    |    4 +
 .../apache/drill/common/config/DrillConfig.java    |    6 +-
 .../org/apache/drill/common/defs/PartitionDef.java |   14 +-
 .../drill/common/expression/types/DataType.java    |    4 +
 .../drill/common/physical/EndpointAffinity.java    |   60 ++++
 .../apache/drill/common/physical/OperatorCost.java |   61 ++++
 .../apache/drill/common/physical/POPConfig.java    |   24 --
 .../org/apache/drill/common/physical/POPCost.java  |   34 --
 .../apache/drill/common/physical/PhysicalPlan.java |   37 ++-
 .../org/apache/drill/common/physical/SetSpec.java  |   36 --
 .../apache/drill/common/physical/StitchDef.java    |   48 ---
 .../drill/common/physical/pop/ExchangePOP.java     |   54 ----
 .../drill/common/physical/pop/FieldCombinePOP.java |   28 --
 .../common/physical/pop/FieldSubdividePOP.java     |   22 --
 .../apache/drill/common/physical/pop/Filter.java   |   52 +++
 .../apache/drill/common/physical/pop/POPBase.java  |   65 ----
 .../physical/pop/PartitionToRandomExchange.java    |   92 ++++++
 .../common/physical/pop/PhysicalOperator.java      |   35 --
 .../apache/drill/common/physical/pop/Project.java  |   53 +++
 .../drill/common/physical/pop/ProjectPOP.java      |   53 ---
 .../common/physical/pop/QuickNWaySortPOP.java      |   50 ---
 .../apache/drill/common/physical/pop/ScanPOP.java  |   55 ----
 .../apache/drill/common/physical/pop/Screen.java   |   77 +++++
 .../drill/common/physical/pop/SingleChildPOP.java  |   41 ---
 .../apache/drill/common/physical/pop/SinkPOP.java  |   22 --
 .../org/apache/drill/common/physical/pop/Sort.java |   57 ++++
 .../apache/drill/common/physical/pop/SortPOP.java  |   54 ----
 .../drill/common/physical/pop/SourcePOP.java       |   22 --
 .../apache/drill/common/physical/pop/StorePOP.java |   61 ----
 .../common/physical/pop/base/AbstractBase.java     |   56 ++++
 .../common/physical/pop/base/AbstractExchange.java |   68 ++++
 .../physical/pop/base/AbstractPhysicalVisitor.java |   80 +++++
 .../common/physical/pop/base/AbstractReceiver.java |   32 ++
 .../common/physical/pop/base/AbstractScan.java     |   62 ++++
 .../common/physical/pop/base/AbstractSender.java   |   29 ++
 .../common/physical/pop/base/AbstractSingle.java   |   48 +++
 .../common/physical/pop/base/AbstractStore.java    |   42 +++
 .../drill/common/physical/pop/base/Exchange.java   |   69 ++++
 .../common/physical/pop/base/ExchangeCost.java     |   55 ++++
 .../common/physical/pop/base/FragmentLeaf.java     |   25 ++
 .../common/physical/pop/base/FragmentRoot.java     |   25 ++
 .../common/physical/pop/base/HasAffinity.java      |   26 ++
 .../drill/common/physical/pop/base/Leaf.java       |   21 ++
 .../common/physical/pop/base/PhysicalOperator.java |   59 ++++
 .../physical/pop/base/PhysicalOperatorUtil.java    |   34 ++
 .../common/physical/pop/base/PhysicalVisitor.java  |   43 +++
 .../drill/common/physical/pop/base/Receiver.java   |   38 +++
 .../drill/common/physical/pop/base/Root.java       |   24 ++
 .../drill/common/physical/pop/base/Scan.java       |   36 ++
 .../drill/common/physical/pop/base/Sender.java     |   29 ++
 .../drill/common/physical/pop/base/Store.java      |   30 ++
 .../drill/common/physical/props/OrderProp.java     |   45 ---
 .../drill/common/physical/props/PartitionProp.java |   36 --
 .../drill/common/physical/props/PhysicalProp.java  |   24 --
 .../drill/common/physical/props/SegmentProp.java   |   42 ---
 .../common/src/main/protobuf/Coordination.proto    |   26 ++
 .../apache/drill/common/physical/MockScanPOP.java  |   30 ++-
 .../apache/drill/common/physical/MockStorePOP.java |   46 ++--
 .../drill/common/physical/ParsePhysicalPlan.java   |    8 +-
 .../apache/drill/common/physical/ParsePlan.java    |   36 --
 .../common/src/test/resources/drill-module.conf    |    2 +-
 .../common/src/test/resources/dsort-physical.json  |   66 ----
 .../common/src/test/resources/physical_test1.json  |   33 ++
 sandbox/prototype/exec/java-exec/pom.xml           |   35 ++-
 .../org/apache/drill/exec/BufferAllocator.java     |   52 ---
 .../java/org/apache/drill/exec/ByteReorder.java    |   54 ++++
 .../apache/drill/exec/DirectBufferAllocator.java   |   47 ---
 .../java/org/apache/drill/exec/ExecConstants.java  |    1 +
 .../apache/drill/exec/cache/DistributedCache.java  |    8 +-
 .../org/apache/drill/exec/cache/HazelCache.java    |   15 +-
 .../org/apache/drill/exec/client/DrillClient.java  |   20 +-
 .../drill/exec/compile/ClassBodyBuilder.java       |  247 +++++++++++++++
 .../apache/drill/exec/compile/ClassCompiler.java   |   29 ++
 .../drill/exec/compile/ClassTransformer.java       |  210 ++++++++++++
 .../drill/exec/compile/JDKClassCompiler.java       |  177 +++++++++++
 .../drill/exec/compile/JaninoClassCompiler.java    |   62 ++++
 .../drill/exec/compile/QueryClassLoader.java       |   80 +++++
 .../exec/compile/TemplateClassDefinition.java      |   58 ++++
 .../drill/exec/coord/ClusterCoordinator.java       |    4 +-
 .../exec/coord/DrillServiceInstanceHelper.java     |    4 +-
 .../drill/exec/coord/ZKClusterCoordinator.java     |   21 +-
 .../exception/ClassTransformationException.java    |   47 +++
 .../exec/exception/FragmentSetupException.java     |   42 +++
 .../apache/drill/exec/foreman/CancelableQuery.java |   22 ++
 .../drill/exec/foreman/ExecutionPlanner.java       |   24 ++
 .../org/apache/drill/exec/foreman/Foreman.java     |   39 +++
 .../apache/drill/exec/foreman/QueryWorkUnit.java   |   54 ++++
 .../apache/drill/exec/foreman/ResourceRequest.java |   30 ++
 .../apache/drill/exec/foreman/StatusProvider.java  |   24 ++
 .../apache/drill/exec/memory/BufferAllocator.java  |   58 ++++
 .../drill/exec/memory/DirectBufferAllocator.java   |   58 ++++
 .../exec/metrics/SingleThreadNestedCounter.java    |   55 ++++
 .../org/apache/drill/exec/ops/BatchIterator.java   |   32 --
 .../exec/ops/FilteringRecordBatchTransformer.java  |   58 ++++
 .../org/apache/drill/exec/ops/FragmentContext.java |   47 +++-
 .../apache/drill/exec/ops/FragmentConverter.java   |   30 ++
 .../org/apache/drill/exec/ops/FragmentRoot.java    |   37 +++
 .../org/apache/drill/exec/ops/OperatorFactory.java |   22 ++
 .../org/apache/drill/exec/ops/QueryContext.java    |   51 +++
 .../org/apache/drill/exec/ops/QueryOutcome.java    |   22 --
 .../java/org/apache/drill/exec/ops/ScanBatch.java  |    2 +-
 .../drill/exec/ops/StreamingRecordBatch.java       |   25 ++
 .../exec/ops/exchange/ExchangeRecordBatch.java     |   22 ++
 .../exec/ops/exchange/PartitioningSender.java      |   23 --
 .../drill/exec/ops/exchange/RandomReceiver.java    |   24 --
 .../drill/exec/ops/filter/FilterRecordBatch.java   |  109 +++++++
 .../exec/ops/filter/SelectionVectorUpdater.java    |   80 +++++
 .../org/apache/drill/exec/planner/ExecPlanner.java |    9 +-
 .../drill/exec/planner/FragmentMaterializer.java   |   86 +++++
 .../apache/drill/exec/planner/FragmentNode.java    |  138 ++++++++
 .../drill/exec/planner/FragmentPlanningSet.java    |   61 ++++
 .../drill/exec/planner/FragmentRunnable.java       |  124 ++++++++
 .../drill/exec/planner/FragmentScheduler.java      |   32 ++
 .../apache/drill/exec/planner/FragmentStats.java   |   63 ++++
 .../drill/exec/planner/FragmentStatsCollector.java |  109 +++++++
 .../apache/drill/exec/planner/FragmentVisitor.java |   22 ++
 .../apache/drill/exec/planner/FragmentWrapper.java |  127 ++++++++
 .../exec/planner/FragmentingPhysicalVisitor.java   |   71 ++++
 .../drill/exec/planner/MaterializedFragment.java   |   69 ++++
 .../drill/exec/planner/PhysicalPlanReader.java     |   47 +++
 .../org/apache/drill/exec/planner/ScanFinder.java  |   54 ++++
 .../drill/exec/planner/SimpleExecPlanner.java      |   54 ++++
 .../drill/exec/planner/SimpleParallelizer.java     |  147 +++++++++
 .../exec/pop/receiver/NWayOrderingReceiver.java    |   52 +++
 .../drill/exec/pop/receiver/RandomReceiver.java    |   55 ++++
 .../drill/exec/pop/sender/HashPartitionSender.java |   49 +++
 .../org/apache/drill/exec/record/BatchSchema.java  |   29 ++-
 .../drill/exec/record/MaterializedField.java       |    4 +
 .../record/vector/AbstractFixedValueVector.java    |    2 +-
 .../drill/exec/record/vector/BaseValueVector.java  |    6 +-
 .../apache/drill/exec/record/vector/BitVector.java |  104 +++++--
 .../drill/exec/record/vector/ByteVector.java       |    2 +-
 .../drill/exec/record/vector/Int16Vector.java      |   52 +++
 .../drill/exec/record/vector/Int32Vector.java      |    4 +-
 .../exec/record/vector/NullableInt32Vector.java    |   47 +++
 .../exec/record/vector/NullableValueVector.java    |   18 +-
 .../drill/exec/record/vector/SelectionVector.java  |   31 ++
 .../drill/exec/record/vector/UInt16Vector.java     |   51 +++
 .../drill/exec/record/vector/ValueVector.java      |    1 +
 .../drill/exec/record/vector/VariableVector.java   |    2 +-
 .../java/org/apache/drill/exec/rpc/bit/BitCom.java |   26 ++-
 .../apache/drill/exec/rpc/bit/BitComHandler.java   |    2 +-
 .../org/apache/drill/exec/rpc/bit/BitComImpl.java  |   24 +-
 .../apache/drill/exec/rpc/bit/SendProgress.java    |   22 ++
 .../org/apache/drill/exec/rpc/user/UserClient.java |    4 +
 .../org/apache/drill/exec/rpc/user/UserServer.java |    2 +-
 .../org/apache/drill/exec/server/Drillbit.java     |   11 +-
 .../apache/drill/exec/server/DrillbitContext.java  |   27 ++-
 .../drill/exec/store/AbstractStorageEngine.java    |   83 +++++
 .../org/apache/drill/exec/store/StorageEngine.java |   32 ++-
 .../drill/exec/store/StorageEngineRegistry.java    |    4 +-
 .../java-exec/src/main/protobuf/Coordination.proto |   32 --
 .../src/main/protobuf/ExecutionProtos.proto        |   72 +++--
 .../java-exec/src/main/protobuf/SchemaDef.proto    |   13 +-
 .../java-exec/src/test/java/BBOutputStream.java    |   38 ---
 .../src/test/java/CompressingBytesColumn.java      |   46 ---
 .../exec/java-exec/src/test/java/ExternalSort.java |   21 --
 .../src/test/java/GenerateExternalSortData.java    |  124 --------
 .../exec/compile/ExampleExternalInterface.java     |   23 ++
 .../exec/compile/ExampleInternalInterface.java     |   24 ++
 .../apache/drill/exec/compile/ExampleTemplate.java |   30 ++
 .../exec/compile/TestClassCompilationTypes.java    |   67 ++++
 .../exec/compile/TestClassTransformation.java      |   53 +++
 .../org/apache/drill/exec/pop/CheckFragmenter.java |   86 +++++
 .../apache/drill/exec/pop/CheckInjectionValue.java |   61 ++++
 .../apache/drill/exec/rpc/user/RunRemoteQuery.java |   41 +++
 .../apache/drill/exec/server/StartDrillbit.java    |    4 +-
 .../apache/drill/exec/store/MockRecordConfig.java  |   46 +++
 .../apache/drill/exec/store/MockRecordReader.java  |  108 +++++++
 .../apache/drill/exec/store/MockStorageEngine.java |   54 ++++
 .../java-exec/src/test/resources/drill-module.conf |    7 +-
 .../src/test/resources/physical_screen.json        |   25 ++
 .../test/resources/physical_simpleexchange.json    |   41 +++
 .../prototype/exec/java-exec/src/test/sh/runbit    |    2 +-
 sandbox/prototype/pom.xml                          |   16 +-
 176 files changed, 6097 insertions(+), 1608 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/pom.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/pom.xml b/sandbox/prototype/common/pom.xml
index 9702dbe..3a15c05 100644
--- a/sandbox/prototype/common/pom.xml
+++ b/sandbox/prototype/common/pom.xml
@@ -8,7 +8,7 @@
 		<groupId>org.apache.drill</groupId>
 		<version>1.0-SNAPSHOT</version>
 	</parent>
-	
+
 	<artifactId>common</artifactId>
 	<packaging>jar</packaging>
 	<name>common</name>
@@ -18,7 +18,7 @@
 		<dependency>
 			<groupId>com.google.protobuf</groupId>
 			<artifactId>protobuf-java</artifactId>
-			<version>2.4.1</version>
+			<version>2.5.0</version>
 		</dependency>
 
 		<dependency>
@@ -67,6 +67,7 @@
 		</dependency>
 
 
+
 	</dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
index 57d367a..c532e18 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/PlanProperties.java
@@ -17,6 +17,9 @@
  ******************************************************************************/
 package org.apache.drill.common;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
 
 public class PlanProperties {
   public static enum PlanType {APACHE_DRILL_LOGICAL, APACHE_DRILL_PHYSICAL}
@@ -25,6 +28,7 @@ public class PlanProperties {
   public int version;
 	public Generator generator = new Generator();
 	
+	@JsonInclude(Include.NON_NULL)
 	public static class Generator{
 		public String type;
 		public String info;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index 5750aaf..2b8f45d 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -23,18 +23,18 @@ import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.CopyOnWriteArrayList;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.drill.common.exceptions.DrillConfigurationException;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.StorageEngineConfigBase;
 import org.apache.drill.common.logical.data.LogicalOperatorBase;
-import org.apache.drill.common.physical.pop.POPBase;
+import org.apache.drill.common.physical.pop.base.PhysicalOperatorUtil;
 import org.apache.drill.common.util.PathScanner;
 
 import com.fasterxml.jackson.core.JsonParser.Feature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.annotations.VisibleForTesting;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
@@ -56,7 +56,7 @@ public final class DrillConfig extends NestedConfig{
     mapper.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
     mapper.configure(Feature.ALLOW_COMMENTS, true);
     mapper.registerSubtypes(LogicalOperatorBase.getSubTypes(this));
-    mapper.registerSubtypes(POPBase.getSubTypes(this));
+    mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(this));
     mapper.registerSubtypes(StorageEngineConfigBase.getSubTypes(this));
     
   };

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
index 181c327..45298df 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/defs/PartitionDef.java
@@ -26,13 +26,13 @@ public class PartitionDef {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionDef.class);
 
   private final PartitionType partitionType;
-  private final LogicalExpression[] expressions;
+  private final LogicalExpression expr;
   private final LogicalExpression[] starts;
   
   @JsonCreator
-  public PartitionDef(@JsonProperty("mode") PartitionType partitionType, @JsonProperty("exprs") LogicalExpression[] expressions, @JsonProperty("starts") LogicalExpression[] starts) {
+  public PartitionDef(@JsonProperty("mode") PartitionType partitionType, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("starts") LogicalExpression[] starts) {
     this.partitionType = partitionType;
-    this.expressions = expressions;
+    this.expr = expr;
     this.starts = starts;
   }
 
@@ -41,9 +41,9 @@ public class PartitionDef {
     return partitionType;
   }
 
-  @JsonProperty("exprs")
-  public LogicalExpression[] getExpressions() {
-    return expressions;
+  @JsonProperty("expr")
+  public LogicalExpression getExpr() {
+    return expr;
   }
 
   @JsonProperty("starts")
@@ -53,6 +53,6 @@ public class PartitionDef {
   
 
   public static enum PartitionType{ 
-    RANDOM, HASH, RANGE;
+    DUPLICATE, RANDOM, HASH, RANGE;
   };
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
index 60d26dc..25b82a7 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/expression/types/DataType.java
@@ -51,6 +51,8 @@ public abstract class DataType {
   public abstract Comparability getComparability();
   public abstract boolean isNumericType();
   
+  
+  
   public static final DataType LATEBIND = new LateBindType();
   public static final DataType BOOLEAN = new AtomType("BOOLEAN", Comparability.EQUAL, false);
   public static final DataType BYTES = new AtomType("BYTES", Comparability.ORDERED, false);
@@ -61,6 +63,8 @@ public abstract class DataType {
   public static final DataType FLOAT64 = new AtomType("FLOAT64", Comparability.ORDERED, true);
   public static final DataType INT64 = new AtomType("INT64", Comparability.ORDERED, true);
   public static final DataType INT32 = new AtomType("INT32", Comparability.ORDERED, true);
+  public static final DataType INT16 = new AtomType("INT16", Comparability.ORDERED, true);
+  public static final DataType UINT16 = new AtomType("UINT16", Comparability.ORDERED, true);
 //  public static final DataType INT16 = new AtomType("int16", Comparability.ORDERED, true);
 //  public static final DataType BIG_INTEGER = new AtomType("bigint", Comparability.ORDERED, true);
 //  public static final DataType BIG_DECIMAL = new AtomType("bigdecimal", Comparability.ORDERED, true);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/EndpointAffinity.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/EndpointAffinity.java
new file mode 100644
index 0000000..9ccf430
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/EndpointAffinity.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical;
+
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+
+public class EndpointAffinity implements Comparable<EndpointAffinity>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
+  
+  private DrillbitEndpoint endpoint;
+  private float affinity = 0.0f;
+  
+  public EndpointAffinity(DrillbitEndpoint endpoint) {
+    super();
+    this.endpoint = endpoint;
+  }
+  
+  public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+    super();
+    this.endpoint = endpoint;
+    this.affinity = affinity;
+  }
+
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+  public void setEndpoint(DrillbitEndpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+  public float getAffinity() {
+    return affinity;
+  }
+  
+  @Override
+  public int compareTo(EndpointAffinity o) {
+    return Float.compare(affinity, o.affinity);
+  }
+  
+  public void addAffinity(float f){
+    affinity += f;
+  }
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/OperatorCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/OperatorCost.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/OperatorCost.java
new file mode 100644
index 0000000..fadfff0
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/OperatorCost.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OperatorCost {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCost.class);
+  
+  private final float network; 
+  private final float disk;
+  private final float memory;
+  private final float cpu;
+  
+  @JsonCreator
+  public OperatorCost(@JsonProperty("network") float network, @JsonProperty("disk") float disk, @JsonProperty("memory") float memory, @JsonProperty("cpu") float cpu) {
+    super();
+    this.network = network;
+    this.disk = disk;
+    this.memory = memory;
+    this.cpu = cpu;
+  }
+
+  public float getNetwork() {
+    return network;
+  }
+
+  public float getDisk() {
+    return disk;
+  }
+
+  public float getMemory() {
+    return memory;
+  }
+
+  public float getCpu() {
+    return cpu;
+  }
+  
+  public static OperatorCost combine(OperatorCost c1, OperatorCost c2){
+    return new OperatorCost(c1.network + c2.network, c1.disk + c2.disk, c1.memory + c2.memory, c1.cpu + c2.cpu);
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPConfig.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPConfig.java
deleted file mode 100644
index 39a91f2..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPConfig.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-public class POPConfig {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(POPConfig.class);
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPCost.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPCost.java
deleted file mode 100644
index b2ee440..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/POPCost.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-public class POPCost {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(POPCost.class);
-  
-  long outputRecordCount;
-  long outputRecordSize;
-  
-  
-  public POPCost(long outputRecordCount, long outputRecordSize) {
-    super();
-    this.outputRecordCount = outputRecordCount;
-    this.outputRecordSize = outputRecordSize;
-  }
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/PhysicalPlan.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/PhysicalPlan.java
index b81ca42..e83dac7 100644
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/PhysicalPlan.java
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/PhysicalPlan.java
@@ -19,22 +19,22 @@ package org.apache.drill.common.physical;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.drill.common.PlanProperties;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.graph.Graph;
 import org.apache.drill.common.graph.GraphAlgos;
-import org.apache.drill.common.logical.StorageEngineConfig;
-import org.apache.drill.common.physical.pop.PhysicalOperator;
-import org.apache.drill.common.physical.pop.SinkPOP;
-import org.apache.drill.common.physical.pop.SourcePOP;
+import org.apache.drill.common.physical.pop.base.Leaf;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Root;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.collect.Lists;
 
 @JsonPropertyOrder({ "head", "graph" })
@@ -42,19 +42,29 @@ public class PhysicalPlan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class);
   
   PlanProperties properties;
-  Graph<PhysicalOperator, SinkPOP, SourcePOP> graph;
+  
+  Graph<PhysicalOperator, Root, Leaf> graph;
   
   @JsonCreator
   public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
     this.properties = properties;
-    this.graph = Graph.newGraph(operators, SinkPOP.class, SourcePOP.class);
+    this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
   }
   
   @JsonProperty("graph")
   public List<PhysicalOperator> getSortedOperators(){
-    List<PhysicalOperator> list = GraphAlgos.TopoSorter.sort(graph);
     // reverse the list so that nested references are flattened rather than nested.
-    return Lists.reverse(list);
+    return getSortedOperators(true);
+  }
+  
+  public List<PhysicalOperator> getSortedOperators(boolean reverse){
+    List<PhysicalOperator> list = GraphAlgos.TopoSorter.sort(graph);
+    if(reverse){
+      return Lists.reverse(list);
+    }else{
+      return list;
+    }
+    
   }
 
 
@@ -64,10 +74,9 @@ public class PhysicalPlan {
   }
 
   /** Parses a physical plan. */
-  public static PhysicalPlan parse(DrillConfig config, String planString) {
-    ObjectMapper mapper = config.getMapper();
+  public static PhysicalPlan parse(ObjectReader reader, String planString) {
     try {
-      PhysicalPlan plan = mapper.readValue(planString, PhysicalPlan.class);
+      PhysicalPlan plan = reader.readValue(planString);
       return plan;
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -75,9 +84,9 @@ public class PhysicalPlan {
   }
 
   /** Converts a physical plan to a string. (Opposite of {@link #parse}.) */
-  public String unparse(DrillConfig config) {
+  public String unparse(ObjectWriter writer) {
     try {
-      return config.getMapper().writeValueAsString(this);
+      return writer.writeValueAsString(this);
     } catch (JsonProcessingException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/SetSpec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/SetSpec.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/SetSpec.java
deleted file mode 100644
index 5250dbb..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/SetSpec.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import java.util.List;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.props.PhysicalProp;
-
-public class SetSpec {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetSpec.class);
-
-  private List<Field> fields;
-  private List<PhysicalProp> traits;
-
-  public class Field {
-    public String name;
-    public DataType type;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/StitchDef.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/StitchDef.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/StitchDef.java
deleted file mode 100644
index d9a7d33..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/StitchDef.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical;
-
-import org.apache.drill.common.expression.LogicalExpression;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public class StitchDef {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StitchDef.class);
-  
-  public static enum StitchMode {RANDOM, NWAY, BLOCK} 
-  
-  private StitchMode mode;
-  private LogicalExpression[] exprs;
-  
-  @JsonCreator 
-  public StitchDef(@JsonProperty("pattern") StitchMode mode, @JsonProperty("exprs") LogicalExpression[] exprs) {
-    super();
-    this.mode = mode;
-    this.exprs = exprs;
-  }
-
-  public StitchMode getMode() {
-    return mode;
-  }
-
-  public LogicalExpression[] getExprs() {
-    return exprs;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ExchangePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ExchangePOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ExchangePOP.java
deleted file mode 100644
index 757f03b..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ExchangePOP.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.Iterator;
-
-import org.apache.drill.common.defs.PartitionDef;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.StitchDef;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("exchange")
-public class ExchangePOP extends SingleChildPOP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangePOP.class);
-  
-  private PartitionDef partition;
-  private StitchDef stitch;
-  
-  @JsonCreator
-  public ExchangePOP(@JsonProperty("fields") FieldSet fields, @JsonProperty("partition") PartitionDef partition, @JsonProperty("stitch") StitchDef stitch) {
-    super(fields);
-    this.partition = partition;
-    this.stitch = stitch;
-  }
-  
-  public PartitionDef getPartition() {
-    return partition;
-  }
-
-  public StitchDef getStitch() {
-    return stitch;
-  }
-
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldCombinePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldCombinePOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldCombinePOP.java
deleted file mode 100644
index ac7e036..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldCombinePOP.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-/**
- * Creates a complex field out of two or more component fields
- */
-public class FieldCombinePOP {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldCombinePOP.class);
-  
-  // fieldsInSortOrder
-  private int[] fieldIds; 
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldSubdividePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldSubdividePOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldSubdividePOP.java
deleted file mode 100644
index c5bd1f9..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/FieldSubdividePOP.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-public class FieldSubdividePOP {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FieldSubdividePOP.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Filter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Filter.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Filter.java
new file mode 100644
index 0000000..2c86d99
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Filter.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.physical.pop.base.AbstractSingle;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("filter")
+public class Filter extends AbstractSingle {
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filter.class);
+
+  private final LogicalExpression expr;
+  
+  @JsonCreator
+  public Filter(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr) {
+    super(child);
+    this.expr = expr;
+  }
+
+  public LogicalExpression getExpr() {
+    return expr;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitFilter(this, value);
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/POPBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/POPBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/POPBase.java
deleted file mode 100644
index 5d44e2a..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/POPBase.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import org.apache.drill.common.config.CommonConstants;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.graph.GraphVisitor;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.POPCost;
-import org.apache.drill.common.util.PathScanner;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-public abstract class POPBase implements PhysicalOperator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(POPBase.class);
-  
-  private FieldSet fieldSet;
-  
-  
-  public POPBase(FieldSet fieldSet){
-    this.fieldSet = fieldSet;
-  }
-  
-  public synchronized static Class<?>[] getSubTypes(DrillConfig config){
-    Class<?>[] ops = PathScanner.scanForImplementationsArr(PhysicalOperator.class, config.getStringList(CommonConstants.PHYSICAL_OPERATOR_SCAN_PACKAGES));
-    logger.debug("Adding Physical Operator sub types: {}", ((Object) ops) );
-    return ops;
-  }
-  
-  @JsonProperty("fields")
-  public FieldSet getFieldSet(){
-    return fieldSet;
-  }
-
-  @Override
-  public void accept(GraphVisitor<PhysicalOperator> visitor) {
-    visitor.enter(this);
-    if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this);
-    for(PhysicalOperator o : this){
-      o.accept(visitor);  
-    }
-    visitor.leave(this);
-  }
-
-  @Override
-  public POPCost getCost() {
-    return null;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PartitionToRandomExchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PartitionToRandomExchange.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PartitionToRandomExchange.java
new file mode 100644
index 0000000..0289780
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PartitionToRandomExchange.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop;
+
+import java.util.List;
+
+import org.apache.drill.common.defs.PartitionDef;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.physical.OperatorCost;
+import org.apache.drill.common.physical.pop.base.AbstractExchange;
+import org.apache.drill.common.physical.pop.base.ExchangeCost;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Receiver;
+import org.apache.drill.common.physical.pop.base.Sender;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("partition-to-random-exchange")
+public class PartitionToRandomExchange extends AbstractExchange{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionToRandomExchange.class);
+
+  private final PartitionDef partition;
+  private final int maxWidth;
+  
+  @JsonCreator
+  public PartitionToRandomExchange(@JsonProperty("child") PhysicalOperator child, @JsonProperty("partition") PartitionDef partition, @JsonProperty("cost") ExchangeCost cost) {
+    super(child, cost);
+    this.partition = partition;
+    
+    LogicalExpression[] parts = partition.getStarts();
+    if(parts != null && parts.length > 0){
+      this.maxWidth = parts.length+1;
+    }else{
+      this.maxWidth = Integer.MAX_VALUE;
+    }
+  }
+
+  public PartitionDef getPartition() {
+    return partition;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitExchange(this,  value);
+  }
+
+  @Override
+  public int getMaxSendWidth() {
+    return maxWidth;
+  }
+
+  @Override
+  public void setupSenders(List<DrillbitEndpoint> senderLocations) {
+  }
+
+  @Override
+  public void setupReceivers(List<DrillbitEndpoint> receiverLocations) {
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+    return null;
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    return null;
+  }
+
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PhysicalOperator.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PhysicalOperator.java
deleted file mode 100644
index 0a8927a..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/PhysicalOperator.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import org.apache.drill.common.graph.GraphValue;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.POPCost;
-
-import com.fasterxml.jackson.annotation.JsonIdentityInfo;
-import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import com.fasterxml.jackson.annotation.ObjectIdGenerators;
-
-@JsonPropertyOrder({"@id"})
-@JsonIdentityInfo(generator=ObjectIdGenerators.IntSequenceGenerator.class, property="@id")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator>{
-  public FieldSet getFieldSet();
-  public POPCost getCost();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Project.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Project.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Project.java
new file mode 100644
index 0000000..7cff28d
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Project.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.physical.pop.base.AbstractSingle;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("project")
+public class Project extends AbstractSingle{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Project.class);
+
+  private final List<NamedExpression> exprs;
+  
+  @JsonCreator
+  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
+    super(child);
+    this.exprs = exprs;
+  }
+
+  public List<NamedExpression> getExprs() {
+    return exprs;
+  }
+
+  
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitProject(this, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ProjectPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ProjectPOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ProjectPOP.java
deleted file mode 100644
index bd481d4..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ProjectPOP.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.List;
-
-import org.apache.drill.common.defs.PartitionDef;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.StitchDef;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("project")
-public class ProjectPOP extends SingleChildPOP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectPOP.class);
-  
-  private List<Integer> fieldIds;
-  private List<LogicalExpression> exprs;
-  
-  @JsonCreator
-  public ProjectPOP(@JsonProperty("output") FieldSet fields, @JsonProperty("fields") List<Integer> fieldIds, @JsonProperty("exprs") List<LogicalExpression> exprs) {
-    super(fields);
-    this.fieldIds = fieldIds;
-    this.exprs = exprs;
-  }
-
-  public List<Integer> getFields() {
-    return fieldIds;
-  }
-
-  public List<LogicalExpression> getExprs() {
-    return exprs;
-  }
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/QuickNWaySortPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/QuickNWaySortPOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/QuickNWaySortPOP.java
deleted file mode 100644
index f7fcdb0..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/QuickNWaySortPOP.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.defs.OrderDef;
-import org.apache.drill.common.physical.FieldSet;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("quicknwaysort")
-public class QuickNWaySortPOP extends SingleChildPOP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QuickNWaySortPOP.class);
-  
-  private List<OrderDef> orderings;
-
-  @JsonCreator
-  public QuickNWaySortPOP(@JsonProperty("fields") FieldSet fieldSet, @JsonProperty("orderings") List<OrderDef> orderings) {
-    super(fieldSet);
-    this.orderings = orderings;
-  }
-
-  @JsonProperty("orderings")
-  public List<OrderDef> getOrderings() {
-    return orderings;
-  }
-
-
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ScanPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ScanPOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ScanPOP.java
deleted file mode 100644
index 2aaf8fa..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/ScanPOP.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.drill.common.JSONOptions;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.ReadEntry;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
-public abstract class ScanPOP<T extends ReadEntry> extends POPBase implements SourcePOP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanPOP.class);
-  
-  private List<T> readEntries;
-  
-  public ScanPOP(List<T> readEntries, FieldSet fieldSet) {
-    super(fieldSet);
-    this.readEntries = readEntries;
-  }
-
-  @JsonProperty("entries")
-  public List<T> getReadEntries() {
-    return readEntries;
-  }
-  
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.emptyIterator();
-  }
-
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Screen.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Screen.java
new file mode 100644
index 0000000..fdbd8f1
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Screen.java
@@ -0,0 +1,77 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.physical.EndpointAffinity;
+import org.apache.drill.common.physical.pop.base.AbstractStore;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Store;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("screen")
+public class Screen extends AbstractStore {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Screen.class);
+
+  private final DrillbitEndpoint endpoint;
+
+  public Screen(@JsonProperty("child") PhysicalOperator child, @JacksonInject DrillbitEndpoint endpoint) {
+    super(child);
+    this.endpoint = endpoint;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.singletonList(new EndpointAffinity(endpoint, 1000));
+  }
+
+  @Override
+  public int getMaxWidth() {
+    return 1;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    // we actually don't have to do anything since nothing should have changed. we'll check just check that things
+    // didn't get screwed up.
+    if (endpoints.size() != 1)
+      throw new UnsupportedOperationException("A Screen operator can only be assigned to a single node.");
+    DrillbitEndpoint endpoint = endpoints.iterator().next();
+    if (this.endpoint != endpoint)
+      throw new UnsupportedOperationException("A Screen operator can only be assigned to its home node.");
+
+  }
+
+  @Override
+  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+    return new Screen(child, endpoint);
+  }
+
+  @JsonIgnore
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SingleChildPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SingleChildPOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SingleChildPOP.java
deleted file mode 100644
index cf0c08b..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SingleChildPOP.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.Iterator;
-
-import org.apache.drill.common.physical.FieldSet;
-
-import com.google.common.collect.Iterators;
-
-public abstract class SingleChildPOP extends POPBase{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleChildPOP.class);
-  
-  public PhysicalOperator child;
-
-  public SingleChildPOP(FieldSet fieldSet) {
-    super(fieldSet);
-  }
-
-  @Override
-  public Iterator<PhysicalOperator> iterator() {
-    return Iterators.singletonIterator(child);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SinkPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SinkPOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SinkPOP.java
deleted file mode 100644
index da0dcd6..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SinkPOP.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-public interface SinkPOP extends PhysicalOperator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SinkPOP.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Sort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Sort.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Sort.java
new file mode 100644
index 0000000..b4d802d
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/Sort.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop;
+
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.physical.pop.base.AbstractSingle;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("sort")
+public class Sort extends AbstractSingle{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
+  
+  private final LogicalExpression expr;
+  private boolean reverse = false;
+  
+  @JsonCreator
+  public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("expr") LogicalExpression expr, @JsonProperty("reverse") boolean reverse) {
+    super(child);
+    this.expr = expr;
+    this.reverse = reverse;
+  }
+  
+  public LogicalExpression getExpr() {
+    return expr;
+  }
+
+  public boolean getReverse() {
+    return reverse;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitSort(this, value);
+  }
+    
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SortPOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SortPOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SortPOP.java
deleted file mode 100644
index 4d0263b..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SortPOP.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.List;
-
-import org.apache.drill.common.defs.PartitionDef;
-import org.apache.drill.common.expression.LogicalExpression;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.StitchDef;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("sort")
-public class SortPOP extends SingleChildPOP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortPOP.class);
-  
-  private int field;
-  private boolean reverse = false;
-  
-  @JsonCreator
-  public SortPOP(@JsonProperty("output") FieldSet fields, @JsonProperty("field") int field, @JsonProperty("reverse") boolean reverse) {
-    super(fields);
-    this.field = field;
-    this.reverse = reverse;
-  }
-
-  public int getField() {
-    return field;
-  }
-
-  public boolean getReverse() {
-    return reverse;
-  }
-    
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SourcePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SourcePOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SourcePOP.java
deleted file mode 100644
index 1b7c8e9..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/SourcePOP.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-public interface SourcePOP extends PhysicalOperator{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SourcePOP.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/StorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/StorePOP.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/StorePOP.java
deleted file mode 100644
index 2b8e075..0000000
--- a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/StorePOP.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.common.physical.pop;
-
-import java.util.List;
-
-import org.apache.drill.common.JSONOptions;
-import org.apache.drill.common.defs.PartitionDef;
-import org.apache.drill.common.physical.FieldSet;
-import org.apache.drill.common.physical.WriteEntry;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-public abstract class StorePOP<T extends WriteEntry> extends SingleChildPOP implements SinkPOP{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorePOP.class);
-
-  public static enum StoreMode {SYSTEM_CHOICE, PREDEFINED_PARTITIONS};
-  
-  private StoreMode mode;
-  private PartitionDef partition;
-  private List<T> entries;
-  
-  @JsonCreator
-  public StorePOP(FieldSet fieldSet, StoreMode mode, PartitionDef partition, List<T> entries) {
-    super(fieldSet);
-    this.mode = mode;
-    this.partition = partition;
-    this.entries = entries;
-  }
-
-  public StoreMode getMode() {
-    return mode;
-  }
-
-  public PartitionDef getPartition() {
-    return partition;
-  }
-
-  public List<T> getEntries(){
-    return entries;
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractBase.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractBase.java
new file mode 100644
index 0000000..5d3584c
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractBase.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop.base;
+
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.common.physical.OperatorCost;
+
+public abstract class AbstractBase implements PhysicalOperator{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
+
+  private OperatorCost cost;
+  
+  @Override
+  public void accept(GraphVisitor<PhysicalOperator> visitor) {
+    visitor.enter(this);
+    if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this);
+    for(PhysicalOperator o : this){
+      o.accept(visitor);  
+    }
+    visitor.leave(this);
+  }
+  
+  @Override
+  public boolean isExecutable() {
+    return true;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return cost;
+  }
+  
+  // should be used only for the purposes of json...
+  void setCost(OperatorCost cost){
+    this.cost = cost;
+  }
+
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractExchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractExchange.java b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractExchange.java
new file mode 100644
index 0000000..1f60c53
--- /dev/null
+++ b/sandbox/prototype/common/src/main/java/org/apache/drill/common/physical/pop/base/AbstractExchange.java
@@ -0,0 +1,68 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.common.physical.pop.base;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.OperatorCost;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public abstract class AbstractExchange extends AbstractSingle implements Exchange {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractExchange.class);
+
+  private final ExchangeCost cost;
+
+  public AbstractExchange(PhysicalOperator child, ExchangeCost cost) {
+    super(child);
+    this.cost = cost;
+  }
+
+  /**
+   * Exchanges are not executable. The Execution layer first has to set their parallelization and convert them into
+   * something executable
+   */
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @Override
+  public OperatorCost getAggregateSendCost() {
+    return cost.getSend();
+  }
+
+  @Override
+  public OperatorCost getAggregateReceiveCost() {
+    return cost.getReceive();
+  }
+
+  @Override
+  public ExchangeCost getExchangeCost() {
+    return cost;
+  }
+
+  @JsonIgnore
+  @Override
+  public OperatorCost getCost() {
+    return cost.getCombinedCost();
+  }
+
+}