You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:24 UTC

[03/53] [abbrv] WIP fragmentation, physical plan, byte compiling, some vector work

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
new file mode 100644
index 0000000..0fc7a1f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
@@ -0,0 +1,25 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+/**
+ * Works on one incoming batch at a time.  Creates one output batch for each input batch.
+ */
+public class StreamingRecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingRecordBatch.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
new file mode 100644
index 0000000..07d7099
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.exchange;
+
+public class ExchangeRecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeRecordBatch.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
deleted file mode 100644
index 6640ef2..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-
-public class PartitioningSender {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitioningSender.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
deleted file mode 100644
index c9f8147..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-public class RandomReceiver {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
new file mode 100644
index 0000000..5bef612
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.filter;
+
+import org.apache.drill.exec.ops.FilteringRecordBatchTransformer;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public abstract class FilterRecordBatch implements RecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
+  private RecordBatch incoming;
+  private SelectionVector selectionVector;
+  private BatchSchema schema;
+  private FilteringRecordBatchTransformer transformer;
+  private int outstanding;
+
+  public FilterRecordBatch(RecordBatch batch) {
+    this.incoming = batch;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return incoming.getContext();
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return 0;
+  }
+
+  @Override
+  public void kill() {
+    incoming.kill();
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+    return null;
+  }
+
+  abstract int applyFilter(SelectionVector vector, int count);
+
+  /**
+   * Release all assets.
+   */
+  private void close() {
+
+  }
+
+  @Override
+  public IterOutcome next() {
+    while (true) {
+      IterOutcome o = incoming.next();
+      switch (o) {
+      case OK_NEW_SCHEMA:
+        transformer = incoming.getContext().getFilteringExpression(null);
+        schema = transformer.getSchema();
+        // fall through to ok.
+      case OK:
+
+      case NONE:
+      case STOP:
+        close();
+        return IterOutcome.STOP;
+      }
+
+      if (outstanding > 0) {
+        // move data to output location.
+
+        for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
+
+        }
+      }
+
+      // make sure the bit vector is as large as the current record batch.
+      if (selectionVector.size() < incoming.getRecordCount()) {
+        selectionVector.allocateNew(incoming.getRecordCount());
+      }
+
+      return null;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
new file mode 100644
index 0000000..218a19a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
@@ -0,0 +1,80 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.DirectBufferAllocator;
+import org.apache.drill.exec.record.vector.NullableInt32Vector;
+import org.apache.drill.exec.record.vector.UInt16Vector;
+import org.codehaus.janino.ExpressionEvaluator;
+
+public class SelectionVectorUpdater {
+  //static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorUpdater.class);
+
+  // Add a selection vector to a record batch.
+  /**
+   * where a + b < 10
+   */
+
+  public static int applyToBatch(final int recordCount, final NullableInt32Vector a, final NullableInt32Vector b,
+      final UInt16Vector selectionVector) {
+    int selectionIndex = 0;
+    for (int i = 0; i < recordCount; i++) {
+      int isNotNull = a.isNull(i) * b.isNull(i);
+      if (isNotNull > 0 && a.get(i) + b.get(i) < 10) {
+        selectionVector.set(selectionIndex, (char) i);
+        selectionIndex++;
+      }
+    }
+    return selectionIndex;
+  }
+
+  public static void mai2n(String[] args) {
+    int size = 1024;
+    BufferAllocator allocator = new DirectBufferAllocator();
+    NullableInt32Vector a = new NullableInt32Vector(0, allocator);
+    NullableInt32Vector b = new NullableInt32Vector(1, allocator);
+    UInt16Vector select = new UInt16Vector(2, allocator);
+    a.allocateNew(size);
+    b.allocateNew(size);
+    select.allocateNew(size);
+    int r = 0;
+    for (int i = 0; i < 1500; i++) {
+      r += applyToBatch(size, a, b, select);
+    }
+
+    System.out.println(r);
+  }
+  
+public static void main(String[] args) throws Exception{
+  ExpressionEvaluator ee = new ExpressionEvaluator(
+      "c > d ? c : d",                     // expression
+      int.class,                           // expressionType
+      new String[] { "c", "d" },           // parameterNames
+      new Class[] { int.class, int.class } // parameterTypes
+  );
+  
+  Integer res = (Integer) ee.evaluate(
+      new Object[] {          // parameterValues
+          new Integer(10),
+          new Integer(11),
+      }
+  );
+  System.out.println("res = " + res);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
index 9554bf3..7b76810 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
@@ -17,11 +17,18 @@
  ******************************************************************************/
 package org.apache.drill.exec.planner;
 
+import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.QueryWorkUnit;
+import org.apache.drill.exec.ops.QueryContext;
+
 
 /**
  * Decides level of paralellization.
  * Generates smaller physical plans
  */
-public class ExecPlanner {
+public interface ExecPlanner {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecPlanner.class);
+  
+  public QueryWorkUnit getWorkUnit(QueryContext context, PhysicalPlan plan, int maxWidth) throws FragmentSetupException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java
new file mode 100644
index 0000000..51b0691
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentMaterializer.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.AbstractStore;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Scan;
+import org.apache.drill.common.physical.pop.base.Store;
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+public class FragmentMaterializer extends AbstractPhysicalVisitor<PhysicalOperator, FragmentMaterializer.IndexedFragmentNode, FragmentSetupException>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentMaterializer.class);
+
+  
+  @Override
+  public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws FragmentSetupException {
+    if(exchange == iNode.getNode().getSendingExchange()){
+      
+      // this is a sending exchange.
+      PhysicalOperator child = exchange.getChild();
+      return exchange.getSender(iNode.getMinorFragmentId(), child);
+      
+    }else{
+      // receiving exchange.
+      return exchange.getReceiver(iNode.getMinorFragmentId());
+    }
+  }
+
+  @Override
+  public PhysicalOperator visitScan(Scan<?> scan, IndexedFragmentNode iNode) throws FragmentSetupException {
+    return scan.getSpecificScan(iNode.getMinorFragmentId());
+  }
+
+  @Override
+  public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws FragmentSetupException {
+    PhysicalOperator child = store.getChild();
+    return store.getSpecificStore(child, iNode.getMinorFragmentId());
+  }
+
+  @Override
+  public PhysicalOperator visitUnknown(PhysicalOperator op, IndexedFragmentNode iNode) throws FragmentSetupException {
+    return op;
+  }
+  
+  public static class IndexedFragmentNode{
+    final FragmentWrapper info;
+    final int minorFragmentId;
+    
+    public IndexedFragmentNode(int minorFragmentId, FragmentWrapper info) {
+      super();
+      this.info = info;
+      this.minorFragmentId = minorFragmentId;
+    }
+
+    public FragmentNode getNode() {
+      return info.getNode();
+    }
+
+    public int getMinorFragmentId() {
+      return minorFragmentId;
+    }
+
+    public FragmentWrapper getInfo() {
+      return info;
+    }
+    
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java
new file mode 100644
index 0000000..f53240e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentNode.java
@@ -0,0 +1,138 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+import com.google.common.collect.Lists;
+
+public class FragmentNode implements Iterable<FragmentNode.ExchangeFragmentPair>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentNode.class);
+  
+  private PhysicalOperator root;
+  private Exchange sendingExchange;
+  private final List<ExchangeFragmentPair> receivingExchangePairs = Lists.newLinkedList();
+  private FragmentStats stats = new FragmentStats();
+  
+  public void addOperator(PhysicalOperator o){
+    if(root == null){
+      root = o;
+    }
+  }
+  
+  public void addSendExchange(Exchange e) throws FragmentSetupException{
+    if(sendingExchange != null) throw new FragmentSetupException("Fragment was trying to add a second SendExchange.  ");
+    sendingExchange = e;
+  }
+  
+  public void addReceiveExchange(Exchange e, FragmentNode fragment){
+    this.receivingExchangePairs.add(new ExchangeFragmentPair(e, fragment));
+  }
+
+  @Override
+  public Iterator<ExchangeFragmentPair> iterator() {
+    return this.receivingExchangePairs.iterator();
+  }
+
+  public List<ExchangeFragmentPair> getReceivingExchangePairs() {
+    return receivingExchangePairs;
+  }
+
+  public PhysicalOperator getRoot() {
+    return root;
+  }
+
+  public Exchange getSendingExchange() {
+    return sendingExchange;
+  }
+
+  public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra){
+    return visitor.visit(this, extra);
+  }
+  
+  public FragmentStats getStats(){
+    return stats;
+  }
+  
+  public class ExchangeFragmentPair {
+    private Exchange exchange;
+    private FragmentNode node;
+    public ExchangeFragmentPair(Exchange exchange, FragmentNode node) {
+      super();
+      this.exchange = exchange;
+      this.node = node;
+    }
+    public Exchange getExchange() {
+      return exchange;
+    }
+    public FragmentNode getNode() {
+      return node;
+    }
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + FragmentNode.this.hashCode();
+      result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
+      result = prime * result + ((node == null) ? 0 : node.hashCode());
+      return result;
+    }
+    
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((receivingExchangePairs == null) ? 0 : receivingExchangePairs.hashCode());
+    result = prime * result + ((root == null) ? 0 : root.hashCode());
+    result = prime * result + ((sendingExchange == null) ? 0 : sendingExchange.hashCode());
+    result = prime * result + ((stats == null) ? 0 : stats.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) return true;
+    if (obj == null) return false;
+    if (getClass() != obj.getClass()) return false;
+    FragmentNode other = (FragmentNode) obj;
+    if (receivingExchangePairs == null) {
+      if (other.receivingExchangePairs != null) return false;
+    } else if (!receivingExchangePairs.equals(other.receivingExchangePairs)) return false;
+    if (root == null) {
+      if (other.root != null) return false;
+    } else if (!root.equals(other.root)) return false;
+    if (sendingExchange == null) {
+      if (other.sendingExchange != null) return false;
+    } else if (!sendingExchange.equals(other.sendingExchange)) return false;
+    if (stats == null) {
+      if (other.stats != null) return false;
+    } else if (!stats.equals(other.stats)) return false;
+    return true;
+  }
+
+  
+ 
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java
new file mode 100644
index 0000000..3f7c3a9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentPlanningSet.java
@@ -0,0 +1,61 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.ops.QueryContext;
+
+public class FragmentPlanningSet implements Iterable<FragmentWrapper>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentPlanningSet.class);
+  
+  private Map<FragmentNode, FragmentWrapper> fragmentMap;
+  private int majorFragmentIdIndex = 0;
+  private QueryContext context;
+  
+  public FragmentPlanningSet(QueryContext context){
+    this.context = context;
+  }
+  
+  public void setStats(FragmentNode node, FragmentStats stats){
+    get(node).setStats(stats);
+  }
+
+  public void addAffinity(FragmentNode n, DrillbitEndpoint endpoint, float affinity){
+    get(n).addEndpointAffinity(endpoint, affinity);
+  }
+  
+  public void setWidth(FragmentNode n, int width){
+    get(n).setWidth(width);
+  }
+  
+  private FragmentWrapper get(FragmentNode node){
+    FragmentWrapper info = fragmentMap.get(node);
+    if(info == null) info = new FragmentWrapper(node, majorFragmentIdIndex++);
+    return info;
+  }
+
+  @Override
+  public Iterator<FragmentWrapper> iterator() {
+    return this.fragmentMap.values().iterator();
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java
new file mode 100644
index 0000000..d551aa4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentRunnable.java
@@ -0,0 +1,124 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.CancelableQuery;
+import org.apache.drill.exec.foreman.StatusProvider;
+import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentConverter;
+import org.apache.drill.exec.ops.FragmentRoot;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
+
+/**
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
+ * messages.
+ */
+public class FragmentRunnable implements Runnable, CancelableQuery, StatusProvider {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRunnable.class);
+
+  private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
+  private final FragmentRoot root;
+  private final FragmentContext context;
+
+  public FragmentRunnable(DrillbitContext dbContext, long fragmentId) throws FragmentSetupException {
+    PlanFragment fragment = dbContext.getCache().getFragment(fragmentId);
+    if (fragment == null) throw new FragmentSetupException(String.format("The provided fragment id [%d] was unknown.", fragmentId));
+    this.context = new FragmentContext(dbContext, fragment);
+    this.root = FragmentConverter.getFragment(this.context);
+  }
+
+  @Override
+  public FragmentStatus getStatus() {
+    return FragmentStatus.newBuilder() //
+        .setBatchesCompleted(context.batchesCompleted.get()) //
+        .setDataProcessed(context.dataProcessed.get()) //
+        .setMemoryUse(context.getAllocator().getAllocatedMemory()) //
+        .build();
+  }
+
+  @Override
+  public boolean cancel(long queryId) {
+    if (context.getFragment().getQueryId() == queryId) {
+      state.set(FragmentState.CANCELLED_VALUE);
+      return true;
+    }
+    return false;
+  }
+
+  private void fail(Throwable cause){
+    context.fail(cause);
+    state.set(FragmentState.FAILED_VALUE);
+  }
+  
+  @Override
+  public void run() {
+    if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
+      fail(new RuntimeException(String.format("Run was called when fragment was in %s state.  FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+      return;
+    }
+    
+    Timer.Context t = context.fragmentTime.time();
+    
+    // setup the query.
+    try{
+      root.setup();
+    }catch(FragmentSetupException e){
+      
+      context.fail(e);
+      return;
+    }
+    
+    // run the query.
+    try{
+      while(state.get() == FragmentState.RUNNING_VALUE){
+        if(!root.next()){
+          updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+        }
+      }
+      t.stop();
+    }catch(Exception ex){
+      fail(ex);
+    }
+    
+  }
+
+  private boolean updateState(FragmentState current, FragmentState update, boolean exceptionOnFailure) {
+    boolean success = state.compareAndSet(current.getNumber(), update.getNumber());
+    if (!success && exceptionOnFailure) {
+      context.fail(new RuntimeException(String.format(
+          "State was different than expected.  Attempting to update state from %s to %s however current state was %s.",
+          current.name(), update.name(), FragmentState.valueOf(state.get()))));
+      return false;
+    }
+
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java
new file mode 100644
index 0000000..168072a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentScheduler.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.exec.foreman.ResourceRequest;
+import org.apache.drill.exec.foreman.ResourceRequest.ResourceAllocation;
+
+import com.google.common.util.concurrent.ListenableFutureTask;
+
+public class FragmentScheduler {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentScheduler.class);
+  
+  public void getRunningResources(ResourceRequest resources, Runnable listener){
+    // request the resource.
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java
new file mode 100644
index 0000000..512b5d0
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStats.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.OperatorCost;
+
+public class FragmentStats {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStats.class);
+  
+  private int maxWidth = Integer.MAX_VALUE;
+  private float networkCost; 
+  private float diskCost;
+  private float memoryCost;
+  private float cpuCost;
+  
+  public void addMaxWidth(int width){
+    maxWidth = Math.min(maxWidth, width);
+  }
+  
+  public void addCost(OperatorCost cost){
+    networkCost += cost.getNetwork();
+    diskCost += cost.getDisk();
+    memoryCost += cost.getMemory();
+    cpuCost += cost.getCpu();
+  }
+
+  public int getMaxWidth() {
+    return maxWidth;
+  }
+
+  public float getNetworkCost() {
+    return networkCost;
+  }
+
+  public float getDiskCost() {
+    return diskCost;
+  }
+
+  public float getMemoryCost() {
+    return memoryCost;
+  }
+
+  public float getCpuCost() {
+    return cpuCost;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java
new file mode 100644
index 0000000..a0dcde3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentStatsCollector.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Scan;
+import org.apache.drill.common.physical.pop.base.Store;
+import org.apache.drill.exec.planner.FragmentNode.ExchangeFragmentPair;
+
+import com.google.common.base.Preconditions;
+
+public class FragmentStatsCollector implements FragmentVisitor<Void, FragmentStats> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatsCollector.class);
+
+  //private HashMap<FragmentNode, FragmentStats> nodeStats = Maps.newHashMap();
+  private final StatsCollector opCollector = new StatsCollector();
+  private final FragmentPlanningSet planningSet;
+  
+  public FragmentStatsCollector(FragmentPlanningSet planningSet){
+    this.planningSet = planningSet;
+  }
+  
+  @Override
+  public Void visit(FragmentNode n, FragmentStats stats) {
+    Preconditions.checkNotNull(stats);
+    Preconditions.checkNotNull(n);
+
+    n.getRoot().accept(opCollector, stats);
+
+    // sending exchange.
+    Exchange sending = n.getSendingExchange();
+    if (sending != null) {
+      stats.addCost(sending.getAggregateSendCost());
+      stats.addMaxWidth(sending.getMaxSendWidth());
+    }
+
+    // receivers...
+    for (ExchangeFragmentPair child : n) {
+      // add exchange receive cost.
+      Exchange receivingExchange = child.getExchange();
+      stats.addCost(receivingExchange.getAggregateReceiveCost());
+
+      FragmentStats childStats = new FragmentStats();
+      FragmentNode childNode = child.getNode();
+      childNode.accept(this, childStats);
+    }
+    
+    // store the stats for later use.
+    planningSet.setStats(n, stats);
+    
+    return null;
+  }
+
+  public void collectStats(FragmentNode rootFragment) {
+    FragmentStats s = new FragmentStats();
+    rootFragment.accept(this, s);
+  }
+
+  private class StatsCollector extends AbstractPhysicalVisitor<Void, FragmentStats, RuntimeException> {
+
+    @Override
+    public Void visitExchange(Exchange exchange, FragmentStats stats) throws RuntimeException {
+      // don't do anything here since we'll add the exchange costs elsewhere. We also don't want navigate across
+      // exchanges since they are separate fragments.
+      return null;
+    }
+
+    @Override
+    public Void visitScan(Scan<?> scan, FragmentStats stats) {
+      stats.addMaxWidth(scan.getReadEntries().size());
+      return super.visitScan(scan, stats);
+    }
+
+    @Override
+    public Void visitStore(Store store, FragmentStats stats) {
+      stats.addMaxWidth(store.getMaxWidth());
+      return super.visitStore(store, stats);
+    }
+
+    @Override
+    public Void visitUnknown(PhysicalOperator op, FragmentStats stats) {
+      stats.addCost(op.getCost());
+      for (PhysicalOperator child : op) {
+        child.accept(this, stats);
+      }
+      return null;
+    }
+
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java
new file mode 100644
index 0000000..12d2b9f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentVisitor.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+public interface FragmentVisitor<T, V> {
+  public T visit(FragmentNode n, V extra);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java
new file mode 100644
index 0000000..a1e4f81
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentWrapper.java
@@ -0,0 +1,127 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.drill.common.physical.EndpointAffinity;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.planner.FragmentNode.ExchangeFragmentPair;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class FragmentWrapper {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWrapper.class);
+
+  private final FragmentNode node;
+  private final int majorFragmentId;
+  private int width = -1;
+  private FragmentStats stats;
+  private boolean endpointsAssigned;
+  private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+
+  // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the
+  // same fragment multiple times to the same endpoint.
+  private List<DrillbitEndpoint> endpoints = Lists.newLinkedList();
+
+  public FragmentWrapper(FragmentNode node, int majorFragmentId) {
+    this.majorFragmentId = majorFragmentId;
+    this.node = node;
+  }
+
+  public FragmentStats getStats() {
+    return stats;
+  }
+
+  public void setStats(FragmentStats stats) {
+    this.stats = stats;
+  }
+
+  public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+    Preconditions.checkState(!endpointsAssigned);
+    EndpointAffinity ea = endpointAffinity.get(endpoint);
+    if (ea == null) {
+      ea = new EndpointAffinity(endpoint);
+      endpointAffinity.put(endpoint, ea);
+    }
+
+    ea.addAffinity(affinity);
+    endpointAffinity.put(endpoint, ea);
+  }
+
+  public int getMajorFragmentId() {
+    return majorFragmentId;
+  }
+
+  public int getWidth() {
+    return width;
+  }
+
+  public void setWidth(int width) {
+    Preconditions.checkState(width == -1);
+    this.width = width;
+  }
+
+  public FragmentNode getNode() {
+    return node;
+  }
+
+  public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) {
+    Preconditions.checkState(!endpointsAssigned);
+
+    endpointsAssigned = true;
+    
+    List<EndpointAffinity> values = Lists.newArrayList();
+    values.addAll(endpointAffinity.values());
+    
+    if(values.size() == 0){
+      final int div = allPossible.size();
+      int start = ThreadLocalRandom.current().nextInt(div);
+      // round robin with random start.
+      for(int i = start; i < start + width; i++){
+        endpoints.add(values.get(i % div).getEndpoint());
+      }
+    }else if(values.size() < width){
+      throw new NotImplementedException("Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width.");
+    }else{
+      // get nodes with highest affinity.
+      Collections.sort(values);
+      values = Lists.reverse(values);
+      for (int i = 0; i < width; i++) {
+        endpoints.add(values.get(i).getEndpoint());
+      }
+    }
+
+    node.getSendingExchange().setupSenders(endpoints);
+    for (ExchangeFragmentPair e : node.getReceivingExchangePairs()) {
+      e.getExchange().setupReceivers(endpoints);
+    }
+  }
+
+  public DrillbitEndpoint getAssignedEndpoint(int minorFragmentId) {
+    Preconditions.checkState(endpointsAssigned);
+    return this.endpoints.get(minorFragmentId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java
new file mode 100644
index 0000000..5f67617
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FragmentingPhysicalVisitor.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+/**
+ * Responsible for breaking a plan into its constituent Fragments.
+ */
+public class FragmentingPhysicalVisitor extends AbstractPhysicalVisitor<FragmentNode, FragmentNode, FragmentSetupException> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentingPhysicalVisitor.class);
+
+  private FragmentNode rootFragment = new FragmentNode();
+  
+  public FragmentingPhysicalVisitor(){
+  }
+  
+  
+  @Override
+  public FragmentNode visitExchange(Exchange exchange, FragmentNode value) throws FragmentSetupException {
+//    logger.debug("Visiting Exchange {}", exchange);
+    if(value == null) throw new FragmentSetupException("The simple fragmenter was called without a FragmentBuilder value.  This will only happen if the initial call to SimpleFragmenter is by a Exchange node.  This should never happen since an Exchange node should never be the root node of a plan.");
+    FragmentNode next = getNextBuilder();
+    value.addReceiveExchange(exchange, next);
+    next.addSendExchange(exchange);
+    exchange.getChild().accept(this, getNextBuilder());
+    return value;
+  }
+  
+  @Override
+  public FragmentNode visitUnknown(PhysicalOperator op, FragmentNode value)  throws FragmentSetupException{
+//    logger.debug("Visiting Other {}", op);
+    value = ensureBuilder(value);
+    value.addOperator(op);
+    for(PhysicalOperator child : op){
+      child.accept(this, value);
+    }
+    return value;
+  }
+  
+  private FragmentNode ensureBuilder(FragmentNode value) throws FragmentSetupException{
+    if(value != null){
+      return value;
+    }else{
+      return rootFragment;
+    }
+  }
+  
+  public FragmentNode getNextBuilder(){
+    return new FragmentNode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java
new file mode 100644
index 0000000..d1c85cb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/MaterializedFragment.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.OperatorCost;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+public class MaterializedFragment {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MaterializedFragment.class);
+
+  final PhysicalOperator root;
+  final DrillbitEndpoint endpoint;
+  final long queryId;
+  final int majorFragmentId;
+  final int minorFragmentId;
+  final OperatorCost cost;
+
+  public MaterializedFragment(PhysicalOperator root, DrillbitEndpoint endpoint, long queryId, int majorFragmentId,
+      int minorFragmentId, OperatorCost cost) {
+    super();
+    this.root = root;
+    this.endpoint = endpoint;
+    this.queryId = queryId;
+    this.majorFragmentId = majorFragmentId;
+    this.minorFragmentId = minorFragmentId;
+    this.cost = cost;
+  }
+
+  public PhysicalOperator getRoot() {
+    return root;
+  }
+
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+  public long getQueryId() {
+    return queryId;
+  }
+
+  public int getMajorFragmentId() {
+    return majorFragmentId;
+  }
+
+  public int getMinorFragmentId() {
+    return minorFragmentId;
+  }
+
+  public OperatorCost getCost() {
+    return cost;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
new file mode 100644
index 0000000..ff31cd9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.io.IOException;
+
+import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+
+public class PhysicalPlanReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
+
+  private final ObjectReader reader;
+
+  public PhysicalPlanReader(ObjectMapper mapper, DrillbitEndpoint endpoint) {
+    InjectableValues injectables = new InjectableValues.Std() //
+        .addValue(DrillbitEndpoint.class, endpoint); //
+    this.reader = mapper.reader(PhysicalPlan.class).with(injectables);
+  }
+
+  public PhysicalPlan read(String json) throws JsonProcessingException, IOException {
+    return reader.readValue(json);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java
new file mode 100644
index 0000000..ff81d90
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ScanFinder.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.common.physical.pop.base.AbstractPhysicalVisitor;
+import org.apache.drill.common.physical.pop.base.Exchange;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.Store;
+
+public class ScanFinder extends AbstractPhysicalVisitor<Boolean, Void, RuntimeException> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanFinder.class);
+
+  private static final ScanFinder finder = new ScanFinder();
+  
+  private ScanFinder(){}
+  
+  @Override
+  public Boolean visitExchange(Exchange exchange, Void value) throws RuntimeException {
+    return false;
+  }
+
+  @Override
+  public Boolean visitStore(Store store, Void value) throws RuntimeException {
+    return true;
+  }
+
+  @Override
+  public Boolean visitUnknown(PhysicalOperator op, Void value) throws RuntimeException {
+    for(PhysicalOperator child : op){
+      if(child.accept(this,  null)) return true;
+    }
+    return false;
+  }
+  
+  public static boolean containsScan(PhysicalOperator op){
+    return op.accept(finder, null);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
new file mode 100644
index 0000000..d1c3add
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
@@ -0,0 +1,54 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.QueryWorkUnit;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+/**
+ * Parallelization is based on available nodes with source or target data.  Nodes that are "overloaded" are excluded from execution.
+ */
+public class SimpleExecPlanner implements ExecPlanner{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleExecPlanner.class);
+  
+  private FragmentingPhysicalVisitor fragmenter = new FragmentingPhysicalVisitor();
+  private SimpleParallelizer parallelizer = new SimpleParallelizer();
+
+  @Override
+  public QueryWorkUnit getWorkUnit(QueryContext context, PhysicalPlan plan, int maxWidth) throws FragmentSetupException {
+    
+    // get the root physical operator and split the plan into sub fragments.
+    PhysicalOperator root = plan.getSortedOperators(false).iterator().next();
+    FragmentNode fragmentRoot = root.accept(fragmenter, null);
+    
+    // generate a planning set and collect stats.
+    FragmentPlanningSet planningSet = new FragmentPlanningSet(context);
+    FragmentStatsCollector statsCollector = new FragmentStatsCollector(planningSet);
+    statsCollector.collectStats(fragmentRoot);
+    
+    return parallelizer.getFragments(context, fragmentRoot, planningSet, maxWidth);
+    
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java
new file mode 100644
index 0000000..a52abaa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleParallelizer.java
@@ -0,0 +1,147 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.foreman.QueryWorkUnit;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.FragmentMaterializer.IndexedFragmentNode;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
+public class SimpleParallelizer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
+
+  private final FragmentMaterializer materializer = new FragmentMaterializer();
+
+  /**
+   * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go
+   * beyond the global max width.
+   * 
+   * @param context
+   *          The current QueryContext.
+   * @param planningSet
+   *          The set of queries with collected statistics that we'll work with.
+   * @param globalMaxWidth
+   *          The maximum level or paralellization any stage of the query can do. Note that while this might be the
+   *          number of active Drillbits, realistically, this could be well beyond that number of we want to do things
+   *          like speed results return.
+   * @return The list of generatoe PlanFragment protobuf objects to be assigned out to the individual nodes.
+   * @throws FragmentSetupException
+   */
+  public QueryWorkUnit getFragments(QueryContext context, FragmentNode rootNode, FragmentPlanningSet planningSet,
+      int globalMaxWidth) throws FragmentSetupException {
+    assignEndpoints(context.getActiveEndpoints(), planningSet, globalMaxWidth);
+    return generateWorkUnit(context.getQueryId(), context.getMapper(), rootNode, planningSet);
+  }
+
+  private QueryWorkUnit generateWorkUnit(long queryId, ObjectMapper mapper, FragmentNode rootNode,
+      FragmentPlanningSet planningSet) throws FragmentSetupException {
+
+    List<PlanFragment> fragments = Lists.newArrayList();
+
+    PlanFragment rootFragment = null;
+
+    // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
+    // assigned before we can materialize, so we start a new loop here rather than utilizing the previous one.
+    for (FragmentWrapper info : planningSet) {
+
+      FragmentNode node = info.getNode();
+      FragmentStats stats = node.getStats();
+      PhysicalOperator abstractRoot = node.getRoot();
+      boolean isRootNode = rootNode == node;
+
+      if (isRootNode && info.getWidth() != 1)
+        throw new FragmentSetupException(
+            String
+                .format(
+                    "Failure while trying to setup fragment.  The root fragment must always have parallelization one.  In the current case, the width was set to %d.",
+                    info.getWidth()));
+      // a fragment is self driven if it doesn't rely on any other exchanges.
+      boolean selfDriven = node.getReceivingExchangePairs().size() == 0;
+
+      // Create a minorFragment for each major fragment.
+      for (int minorFragmentId = 0; minorFragmentId < info.getWidth(); minorFragmentId++) {
+        IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, info);
+        PhysicalOperator root = abstractRoot.accept(materializer, iNode);
+
+        // get plan as JSON
+        String plan;
+        try {
+          plan = mapper.writeValueAsString(root);
+        } catch (JsonProcessingException e) {
+          throw new FragmentSetupException("Failure while trying to convert fragment into json.", e);
+        }
+
+        PlanFragment fragment = PlanFragment.newBuilder() //
+            .setCpuCost(stats.getCpuCost()) //
+            .setDiskCost(stats.getDiskCost()) //
+            .setMemoryCost(stats.getMemoryCost()) //
+            .setNetworkCost(stats.getNetworkCost()) //
+            .setFragmentJson(plan) //
+            .setMinorFragmentId(minorFragmentId) //
+            .setMajorFragmentId(info.getMajorFragmentId()).setQueryId(queryId) //
+            .setAssignment(info.getAssignedEndpoint(minorFragmentId)).setSelfDriven(selfDriven).build();
+
+        if (isRootNode) {
+          rootFragment = fragment;
+        } else {
+          fragments.add(fragment);
+        }
+      }
+    }
+
+    return new QueryWorkUnit(rootFragment, fragments);
+
+  }
+
+  private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, FragmentPlanningSet planningSet,
+      int globalMaxWidth) {
+    // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
+    // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
+    // could be based on endpoint load)
+    for (FragmentWrapper info : planningSet) {
+
+      FragmentStats stats = info.getStats();
+
+      // figure out width.
+      int width = Math.min(stats.getMaxWidth(), globalMaxWidth);
+      float diskCost = stats.getDiskCost();
+
+      // TODO: right now we'll just assume that each task is cost 1 so we'll set the breadth at the lesser of the number
+      // of tasks or the maximum width of the fragment.
+      if (diskCost < width) {
+        width = (int) diskCost;
+      }
+
+      if (width < 1) width = 1;
+      info.setWidth(width);
+
+      // figure out endpoint assignments. also informs the exchanges about their respective endpoints.
+      info.assignEndpoints(allNodes);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
new file mode 100644
index 0000000..562d109
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/NWayOrderingReceiver.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop.receiver;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.AbstractReceiver;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("nway-ordering-receiver")
+public class NWayOrderingReceiver extends AbstractReceiver{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NWayOrderingReceiver.class);
+
+  @Override
+  public List<DrillbitEndpoint> getProvidingEndpoints() {
+    return null;
+  }
+
+  @Override
+  public boolean supportsOutOfOrderExchange() {
+    return false;
+  }
+
+  @Override
+  public int getSenderCount() {
+    return 0;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
new file mode 100644
index 0000000..487c645
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/receiver/RandomReceiver.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop.receiver;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.AbstractReceiver;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("receiver-random")
+public class RandomReceiver extends AbstractReceiver{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
+
+  @Override
+  public List<DrillbitEndpoint> getProvidingEndpoints() {
+    return null;
+  }
+
+  @Override
+  public boolean supportsOutOfOrderExchange() {
+    return false;
+  }
+
+  @Override
+  public int getSenderCount() {
+    return 0;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return null;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
new file mode 100644
index 0000000..b0fb51c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/pop/sender/HashPartitionSender.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop.sender;
+
+import java.util.List;
+
+import org.apache.drill.common.physical.pop.base.AbstractSender;
+import org.apache.drill.common.physical.pop.base.PhysicalOperator;
+import org.apache.drill.common.physical.pop.base.PhysicalVisitor;
+import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("hash-partition-sender")
+public class HashPartitionSender extends AbstractSender {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashPartitionSender.class);
+
+  public HashPartitionSender(PhysicalOperator child) {
+    super(child);
+  }
+
+
+  @Override
+  public List<DrillbitEndpoint> getDestinations() {
+    return null;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return null;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index 1d32340..d3e4b23 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -32,10 +32,12 @@ import com.google.common.collect.Lists;
 public class BatchSchema implements Iterable<MaterializedField>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
   
-  private List<MaterializedField> fields = Lists.newArrayList();
+  private final List<MaterializedField> fields;
+  private final boolean hasSelectionVector;
   
-  private BatchSchema(List<MaterializedField> fields) {
+  private BatchSchema(boolean hasSelectionVector, List<MaterializedField> fields) {
     this.fields = fields;
+    this.hasSelectionVector = hasSelectionVector;
   }
 
   @Override
@@ -59,10 +61,13 @@ public class BatchSchema implements Iterable<MaterializedField>{
     private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
     private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
     
+    private boolean hasSelectionVector;
+    
     public BatchSchemaBuilder(BatchSchema expected){
       for(MaterializedField f: expected){
         expectedFields.put(f.getFieldId(), f);
       }
+      hasSelectionVector = expected.hasSelectionVector;
     }
     
     public BatchSchemaBuilder(){
@@ -80,6 +85,10 @@ public class BatchSchema implements Iterable<MaterializedField>{
       addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
     }
     
+    public void setSelectionVector(boolean hasSelectionVector){
+      this.hasSelectionVector = hasSelectionVector;
+    }
+    
     private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
       MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
       if(expectedFields != null){
@@ -99,13 +108,13 @@ public class BatchSchema implements Iterable<MaterializedField>{
       setTypedField(fieldId, type, nullable, mode, valueClass);
     }
     
-    public void addVector(ValueVector<?> v){
-      
-    }
-    
-    public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
-      
-    }
+//    public void addVector(ValueVector<?> v){
+//      
+//    }
+//    
+//    public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
+//      
+//    }
     
     
     public BatchSchema buildAndClear() throws SchemaChangeException{
@@ -116,7 +125,7 @@ public class BatchSchema implements Iterable<MaterializedField>{
         if(f != null) fieldList.add(f);
       }
       Collections.sort(fieldList);
-      return new BatchSchema(fieldList);
+      return new BatchSchema(this.hasSelectionVector, fieldList);
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
index 3cadf89..2e941a2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java
@@ -70,6 +70,10 @@ public class MaterializedField implements Comparable<MaterializedField>{
     check("valueMode", this.mode, expected.mode);
   }
 
+  public MaterializedField getNullableVersion(Class<?> valueClass){
+    return new MaterializedField(fieldId, type, true, mode, valueClass);
+  }
+  
   @Override
   public int compareTo(MaterializedField o) {
     return Integer.compare(this.fieldId, o.fieldId);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
index 735493d..912e02d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/AbstractFixedValueVector.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.record.vector;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 
 /**
  * Abstract class that fixed value vectors are derived from.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/f0be80dc/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
index 33a81e5..8d524b2 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BaseValueVector.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.record.vector;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.DeadBuf;
 
 public abstract class BaseValueVector<T extends BaseValueVector<T>> implements ValueVector<T>{
@@ -37,7 +37,9 @@ public abstract class BaseValueVector<T extends BaseValueVector<T>> implements V
 
   public final void allocateNew(int valueCount){
     int allocationSize = getAllocationSize(valueCount);
-    resetAllocation(valueCount, allocator.buffer(allocationSize));
+    ByteBuf newBuf =  allocator.buffer(allocationSize);
+    newBuf.retain();
+    resetAllocation(valueCount, newBuf);
   }
 
   protected abstract int getAllocationSize(int valueCount);