You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:36 UTC

[15/53] [abbrv] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
new file mode 100644
index 0000000..0044628
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorageEngine.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.StorageEngine;
+import org.apache.drill.exec.store.StorageEngine.ReadEntry;
+
+import com.google.common.collect.ListMultimap;
+
+public class MockStorageEngine extends AbstractStorageEngine{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
+    return null;
+  }
+
+  @Override
+  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
+    return null;
+  }
+
+  @Override
+  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
+    return null;
+  }
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
new file mode 100644
index 0000000..639d0d2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MockStorePOP.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("mock-store")
+public class MockStorePOP extends AbstractStore {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorePOP.class);
+
+  @JsonCreator
+  public MockStorePOP(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+
+  public int getMaxWidth() {
+    return 1;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
+    
+  }
+
+  @Override
+  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+    return new MockStorePOP(child);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(1,getSize().getRecordCount()*getSize().getRecordSize(),1,1);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new MockStorePOP(child);
+  }
+
+
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
new file mode 100644
index 0000000..eb77eeb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/PartitionRange.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.common.expression.LogicalExpression;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class PartitionRange {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionRange.class);
+  
+  private LogicalExpression start;
+  private LogicalExpression finish;
+  
+  @JsonCreator
+  public PartitionRange(@JsonProperty("start") LogicalExpression start, @JsonProperty("finish") LogicalExpression finish) {
+    super();
+    this.start = start;
+    this.finish = finish;
+  }
+
+  public LogicalExpression getStart() {
+    return start;
+  }
+
+  public LogicalExpression getFinish() {
+    return finish;
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
new file mode 100644
index 0000000..e869393
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Project.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("project")
+public class Project extends AbstractSingle{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Project.class);
+
+  private final List<NamedExpression> exprs;
+  
+  @JsonCreator
+  public Project(@JsonProperty("exprs") List<NamedExpression> exprs, @JsonProperty("child") PhysicalOperator child) {
+    super(child);
+    this.exprs = exprs;
+  }
+
+  public List<NamedExpression> getExprs() {
+    return exprs;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitProject(this, value);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
+  }
+  
+  @Override
+  public Size getSize() {
+    //TODO: This should really change the row width...
+    return child.getSize();
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new Project(exprs, child);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
new file mode 100644
index 0000000..ed41586
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RandomReceiver.java
@@ -0,0 +1,83 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractReceiver;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("random-receiver")
+public class RandomReceiver extends AbstractReceiver{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
+
+  private List<DrillbitEndpoint> senders;
+  
+  @JsonCreator
+  public RandomReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId, @JsonProperty("senders") List<DrillbitEndpoint> senders){
+    super(oppositeMajorFragmentId);
+    this.senders = senders;
+  }
+  
+  @Override
+  public List<DrillbitEndpoint> getProvidingEndpoints() {
+    return senders;
+  }
+
+  @Override
+  public boolean supportsOutOfOrderExchange() {
+    return true;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    //TODO: deal with receiver cost through exchange.
+    return new OperatorCost(1,1,1,1);
+  }
+
+  
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitRandomReceiver(this, value);
+  }
+
+  @Override
+  public Size getSize() {
+    //TODO: deal with size info through exchange.
+    return new Size(1,1);
+  }
+
+  @Override
+  public int getOppositeMajorFragmentId() {
+    return 0;
+  }
+
+  
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
new file mode 100644
index 0000000..7d64dba
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/RangeSender.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("range-sender")
+public class RangeSender extends AbstractSender{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RangeSender.class);
+
+  List<EndpointPartition> partitions;
+  
+  @JsonCreator
+  public RangeSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("partitions") List<EndpointPartition> partitions) {
+    super(oppositeMajorFragmentId, child);
+    this.partitions = partitions;
+  }
+
+  @Override
+  public List<DrillbitEndpoint> getDestinations() {
+    return null;
+  }
+
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new RangeSender(oppositeMajorFragmentId, child, partitions);
+  }
+
+
+  public static class EndpointPartition{
+    private final PartitionRange range;
+    private final DrillbitEndpoint endpoint;
+    
+    @JsonCreator
+    public EndpointPartition(@JsonProperty("range") PartitionRange range, @JsonProperty("endpoint") DrillbitEndpoint endpoint) {
+      super();
+      this.range = range;
+      this.endpoint = endpoint;
+    }
+    public PartitionRange getRange() {
+      return range;
+    }
+    public DrillbitEndpoint getEndpoint() {
+      return endpoint;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
new file mode 100644
index 0000000..86a201d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Screen.java
@@ -0,0 +1,106 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractStore;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Root;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("screen")
+public class Screen extends AbstractStore implements Root{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Screen.class);
+
+  private final DrillbitEndpoint endpoint;
+
+  public Screen(@JsonProperty("child") PhysicalOperator child, @JacksonInject DrillbitEndpoint endpoint) {
+    super(child);
+    this.endpoint = endpoint;
+  }
+
+  @Override
+  public List<EndpointAffinity> getOperatorAffinity() {
+    return Collections.singletonList(new EndpointAffinity(endpoint, 1000000000000l));
+  }
+
+  @Override
+  public int getMaxWidth() {
+    return 1;
+  }
+
+  @Override
+  public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
+    // we actually don't have to do anything since nothing should have changed. we'll check just check that things
+    // didn't get screwed up.
+    if (endpoints.size() != 1) throw new PhysicalOperatorSetupException("A Screen operator can only be assigned to a single node.");
+    DrillbitEndpoint endpoint = endpoints.iterator().next();
+    logger.debug("Endpoint this: {}, assignment: {}", this.endpoint, endpoint);
+    if (!endpoint.equals(this.endpoint)) {
+      throw new PhysicalOperatorSetupException(String.format(
+          "A Screen operator can only be assigned to its home node.  Expected endpoint %s, Actual endpoint: %s",
+          this.endpoint, endpoint));
+    }
+  }
+
+  @Override
+  public Store getSpecificStore(PhysicalOperator child, int minorFragmentId) {
+    return new Screen(child, endpoint);
+  }
+
+  @JsonIgnore
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+
+  @Override
+  public String toString() {
+    return "Screen [endpoint=" + endpoint + ", getChild()=" + getChild() + "]";
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return new OperatorCost(0, 0, 1000, child.getSize().getRecordCount());
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new Screen(child, endpoint);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitScreen(this, value);
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
new file mode 100644
index 0000000..79d937a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleSender.java
@@ -0,0 +1,78 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSender;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * Sender that pushes all data to a single destination node.
+ */
+@JsonTypeName("single-sender")
+public class SingleSender extends AbstractSender {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSender.class);
+
+  private final DrillbitEndpoint destination;
+  
+  @JsonCreator
+  public SingleSender(@JsonProperty("receiver-major-fragment") int oppositeMajorFragmentId, @JsonProperty("child") PhysicalOperator child, @JsonProperty("destination") DrillbitEndpoint destination) {
+    super(oppositeMajorFragmentId, child);
+    this.destination = destination;
+  }
+
+  @Override
+  @JsonIgnore
+  public List<DrillbitEndpoint> getDestinations() {
+    return Collections.singletonList(destination);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    long recordSize = child.getSize().getRecordSize() * child.getSize().getRecordCount();
+    return new OperatorCost((float) recordSize, recordSize, 0, child.getSize().getRecordCount()/(1<<16));
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new SingleSender(oppositeMajorFragmentId, child, destination);
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSingleSender(this, value);
+  }
+ 
+
+  public DrillbitEndpoint getDestination() {
+    return destination;
+  }
+ 
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
new file mode 100644
index 0000000..e4ece6b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Sort.java
@@ -0,0 +1,86 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.defs.OrderDef;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("sort")
+public class Sort extends AbstractSingle{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Sort.class);
+  
+  private final List<OrderDef> orderings;
+  private boolean reverse = false;
+  
+  @JsonCreator
+  public Sort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<OrderDef> orderings, @JsonProperty("reverse") boolean reverse) {
+    super(child);
+    this.orderings = orderings;
+    this.reverse = reverse;
+  }
+
+  public List<OrderDef> getOrderings() {
+    return orderings;
+  }
+
+  public boolean getReverse() {
+    return reverse;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E{
+    return physicalVisitor.visitSort(this, value);
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    Size childSize = child.getSize();
+    long n = childSize.getRecordCount();
+    long width = childSize.getRecordSize();
+
+    //TODO: Magic Number, let's assume 1/10 of data can fit in memory. 
+    int k = 10;
+    long n2 = n/k;
+    double cpuCost = 
+        k * n2 * (Math.log(n2)/Math.log(2)) + // 
+        n * (Math.log(k)/Math.log(2));
+    double diskCost = n*width*2;
+    
+    return new OperatorCost(0, (float) diskCost, (float) n2*width, (float) cpuCost);
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new Sort(child, orderings, reverse);
+  }
+
+    
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
new file mode 100644
index 0000000..56467ce
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -0,0 +1,79 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.AbstractExchange;
+import org.apache.drill.exec.physical.base.ExchangeCost;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Receiver;
+import org.apache.drill.exec.physical.base.Sender;
+import org.apache.drill.exec.physical.base.Size;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("union-exchange")
+public class UnionExchange extends AbstractExchange{
+
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionExchange.class);
+
+  private List<DrillbitEndpoint> senderLocations;
+  private DrillbitEndpoint destinationLocation;
+  
+  public UnionExchange(@JsonProperty("child") PhysicalOperator child) {
+    super(child);
+  }
+  
+  @Override
+  public void setupSenders(List<DrillbitEndpoint> senderLocations) {
+    this.senderLocations = senderLocations;
+  }
+
+  @Override
+  protected void setupReceivers(List<DrillbitEndpoint> receiverLocations) throws PhysicalOperatorSetupException {
+    if(receiverLocations.size() != 1) throw new PhysicalOperatorSetupException("A Union Exchange only supports a single receiver endpoint.");
+    this.destinationLocation = receiverLocations.iterator().next();
+  }
+
+  @Override
+  public Sender getSender(int minorFragmentId, PhysicalOperator child) {
+    return new SingleSender(this.receiverMajorFragmentId, child, destinationLocation);
+  }
+
+  @Override
+  public Receiver getReceiver(int minorFragmentId) {
+    return new RandomReceiver(this.senderMajorFragmentId, senderLocations);
+  }
+
+  @Override
+  public int getMaxSendWidth() {
+    return Integer.MAX_VALUE;
+  }
+
+  @Override
+  protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+    return new UnionExchange(child);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
new file mode 100644
index 0000000..9a7df56
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface BatchCreator<T extends PhysicalOperator> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
+  
+  public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
new file mode 100644
index 0000000..6592ca1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilterRecordBatch.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public abstract class FilterRecordBatch implements RecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
+
+  private RecordBatch incoming;
+  private SelectionVector selectionVector;
+  private BatchSchema schema;
+  private FilteringRecordBatchTransformer transformer;
+  private int outstanding;
+
+  public FilterRecordBatch(RecordBatch batch) {
+    this.incoming = batch;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return incoming.getContext();
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return 0;
+  }
+
+  @Override
+  public void kill() {
+    incoming.kill();
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+    return null;
+  }
+
+  abstract int applyFilter(SelectionVector vector, int count);
+
+  /**
+   * Release all assets.
+   */
+  private void close() {
+
+  }
+
+  @Override
+  public IterOutcome next() {
+    while (true) {
+      IterOutcome o = incoming.next();
+      switch (o) {
+      case OK_NEW_SCHEMA:
+        transformer = incoming.getContext().getFilteringExpression(null);
+        schema = transformer.getSchema();
+        // fall through to ok.
+      case OK:
+
+      case NONE:
+      case STOP:
+        close();
+        return IterOutcome.STOP;
+      }
+
+      if (outstanding > 0) {
+        // move data to output location.
+
+        for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
+
+        }
+      }
+
+      // make sure the bit vector is as large as the current record batch.
+      if (selectionVector.capacity() < incoming.getRecordCount()) {
+        selectionVector.allocateNew(incoming.getRecordCount());
+      }
+
+      return null;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
new file mode 100644
index 0000000..191521a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/FilteringRecordBatchTransformer.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.SelectionVector;
+
+public abstract class FilteringRecordBatchTransformer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
+  
+  final RecordBatch incoming;
+  final SelectionVector selectionVector;
+  final BatchSchema schema;
+  
+  public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector selectionVector) {
+    super();
+    this.incoming = incoming;
+    this.selectionVector = selectionVector;
+    this.schema = innerSetup();
+  }
+
+  public abstract BatchSchema innerSetup();
+  
+  /**
+   * Applies the filter to the selection index.  Ignores any values in the selection vector, instead creating a.
+   * @return
+   */
+  public abstract int apply();
+  
+  /**
+   * Applies the filter to the selection index.  Utilizes the existing selection index and only evaluates on those records.
+   * @return
+   */
+  public abstract int applyWithSelection();
+
+  public BatchSchema getSchema() {
+    return schema;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
new file mode 100644
index 0000000..d98c107
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -0,0 +1,102 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Scan;
+import org.apache.drill.exec.physical.config.MockScanBatchCreator;
+import org.apache.drill.exec.physical.config.MockScanPOP;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentContext, ExecutionSetupException>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
+
+  private MockScanBatchCreator msc = new MockScanBatchCreator();
+  private ScreenCreator sc = new ScreenCreator();
+  private RandomReceiverCreator rrc = new RandomReceiverCreator();
+  private SingleSenderCreator ssc = new SingleSenderCreator();
+  private RootExec root = null;
+  
+  private ImplCreator(){}
+  
+  public RootExec getRoot(){
+    return root;
+  }
+  
+  
+  @Override
+  public RecordBatch visitScan(Scan<?> scan, FragmentContext context) throws ExecutionSetupException {
+    Preconditions.checkNotNull(scan);
+    Preconditions.checkNotNull(context);
+    
+    if(scan instanceof MockScanPOP){
+      return msc.getBatch(context, (MockScanPOP) scan, Collections.<RecordBatch> emptyList());
+    }else{
+      return super.visitScan(scan, context);  
+    }
+    
+  }
+
+  @Override
+  public RecordBatch visitScreen(Screen op, FragmentContext context) throws ExecutionSetupException {
+    Preconditions.checkArgument(root == null);
+    root = sc.getRoot(context, op, getChildren(op, context));
+    return null;
+  }
+
+  
+  
+  @Override
+  public RecordBatch visitSingleSender(SingleSender op, FragmentContext context) throws ExecutionSetupException {
+    root = ssc.getRoot(context, op, getChildren(op, context));
+    return null;
+  }
+
+  @Override
+  public RecordBatch visitRandomReceiver(RandomReceiver op, FragmentContext context) throws ExecutionSetupException {
+    return rrc.getBatch(context, op, null);
+  }
+
+  private List<RecordBatch> getChildren(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
+    List<RecordBatch> children = Lists.newArrayList();
+    for(PhysicalOperator child : op){
+      children.add(child.accept(this, context));
+    }
+    return children;
+  }
+  
+  public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException{
+    ImplCreator i = new ImplCreator();
+    root.accept(i, context);
+    if(i.root == null) throw new ExecutionSetupException("The provided fragment did not have a root node that correctly created a RootExec value.");
+    return i.getRoot();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
new file mode 100644
index 0000000..ce0cf66
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public interface OutputMutator {
+  public void removeField(int fieldId) throws SchemaChangeException;
+  public void addField(int fieldId, ValueVector<?> vector) throws SchemaChangeException ;
+  public void setNewSchema() throws SchemaChangeException ;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
new file mode 100644
index 0000000..9995bc2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/PhysicalConfig.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Target({ElementType.TYPE})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface PhysicalConfig {
+  Class<?> value();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
new file mode 100644
index 0000000..4b991f8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.RandomReceiver;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.batch.RawBatchBuffer;
+
+public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiverCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, RandomReceiver receiver, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children == null || children.isEmpty();
+    IncomingBuffers bufHolder = context.getBuffers();
+    assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
+    
+    RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+    assert buffers.length == 1;
+    RawBatchBuffer buffer = buffers[0];
+    return new WireRecordBatch(context, buffer);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
new file mode 100644
index 0000000..80def05
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface RootCreator<T extends PhysicalOperator> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class);
+  
+  public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
new file mode 100644
index 0000000..3f8aac7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -0,0 +1,40 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+
+/**
+ * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
+ * output nodes and storage nodes.  They are there driving force behind the completion of a query.
+ */
+public interface RootExec {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class);
+  
+  /**
+   * Do the next batch of work.  
+   * @return Whether or not additional batches of work are necessary.  False means that this fragment is done.
+   */
+  public boolean next();
+  
+  /**
+   * Inform all children to clean up and go away.
+   */
+  public void stop();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
new file mode 100644
index 0000000..33c1e29
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -0,0 +1,172 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.SchemaBuilder;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+import com.google.common.collect.Lists;
+
+/**
+ * Record batch used for a particular scan. Operators against one or more
+ */
+public class ScanBatch implements RecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+
+  private IntObjectOpenHashMap<ValueVector<?>> fields = new IntObjectOpenHashMap<ValueVector<?>>();
+  private BatchSchema schema;
+  private int recordCount;
+  private boolean schemaChanged = true;
+  private final FragmentContext context;
+  private Iterator<RecordReader> readers;
+  private RecordReader currentReader;
+  private final Mutator mutator = new Mutator();
+
+  public ScanBatch(FragmentContext context, Iterator<RecordReader> readers)
+      throws ExecutionSetupException {
+    this.context = context;
+    this.readers = readers;
+    if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader.");
+    this.currentReader = readers.next();
+    this.currentReader.setup(mutator);
+  }
+
+  private void schemaChanged() {
+    schema = null;
+    schemaChanged = true;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+  @Override
+  public void kill() {
+    releaseAssets();
+  }
+
+  private void releaseAssets() {
+    fields.forEach(new IntObjectProcedure<ValueVector<?>>() {
+      @Override
+      public void apply(int key, ValueVector<?> value) {
+        value.close();
+      }
+    });
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+    if (fields.containsKey(fieldId)) throw new InvalidValueAccessor(String.format("Unknown value accesor for field id %d."));
+    ValueVector<?> vector = this.fields.lget();
+    if (vector.getClass().isAssignableFrom(clazz)) {
+      return (T) vector;
+    } else {
+      throw new InvalidValueAccessor(String.format(
+          "You requested a field accessor of type %s for field id %d but the actual type was %s.",
+          clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
+    }
+  }
+
+  @Override
+  public IterOutcome next() {
+    while ((recordCount = currentReader.next()) == 0) {
+      try {
+        if (!readers.hasNext()) {
+          currentReader.cleanup();
+          releaseAssets();
+          return IterOutcome.NONE;
+        }
+        currentReader.cleanup();
+        currentReader = readers.next();
+        currentReader.setup(mutator);
+      } catch (ExecutionSetupException e) {
+        this.context.fail(e);
+        releaseAssets();
+        return IterOutcome.STOP;
+      }
+    }
+
+    if (schemaChanged) {
+      schemaChanged = false;
+      return IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      return IterOutcome.OK;
+    }
+  }
+
+  private class Mutator implements OutputMutator {
+    private SchemaBuilder builder = BatchSchema.newBuilder();
+    
+    public void removeField(int fieldId) throws SchemaChangeException {
+      schemaChanged();
+      ValueVector<?> v = fields.remove(fieldId);
+      if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
+      v.close();
+    }
+
+    public void addField(int fieldId, ValueVector<?> vector) {
+      schemaChanged();
+      ValueVector<?> v = fields.put(fieldId, vector);
+      vector.getField();
+      builder.addField(vector.getField());
+      if (v != null) v.close();
+    }
+
+    @Override
+    public void setNewSchema() throws SchemaChangeException {
+      ScanBatch.this.schema = this.builder.build();
+      ScanBatch.this.schemaChanged = true;
+    }
+
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return WritableBatch.get(this.getRecordCount(), fields);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
new file mode 100644
index 0000000..c0711db
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Screen;
+import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
+import org.apache.drill.exec.physical.impl.materialize.VectorRecordMaterializer;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+
+import com.google.common.base.Preconditions;
+
+public class ScreenCreator implements RootCreator<Screen>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
+
+  @Override
+  public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+    Preconditions.checkArgument(children.size() == 1);
+    return new ScreenRoot(context, children.iterator().next());
+  }
+  
+  
+  private static class ScreenRoot implements RootExec{
+
+    final RecordBatch incoming;
+    final FragmentContext context;
+    final UserClientConnection connection;
+    private RecordMaterializer materializer;
+    
+    public ScreenRoot(FragmentContext context, RecordBatch incoming){
+      assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client.  As such, this should always be true.";
+
+      this.context = context;
+      this.incoming = incoming;
+      this.connection = context.getConnection();
+    }
+    
+    @Override
+    public boolean next() {
+      IterOutcome outcome = incoming.next();
+      boolean isLast = false;
+      switch(outcome){
+      case NONE:
+      case STOP:
+        connection.sendResult(materializer.convertNext(true));
+        context.batchesCompleted.inc(1);
+        context.recordsCompleted.inc(incoming.getRecordCount());
+        return false;
+        
+      case OK_NEW_SCHEMA:
+        materializer = new VectorRecordMaterializer(context, incoming);
+        // fall through.
+        // fall through
+      case OK:
+        connection.sendResult(materializer.convertNext(false));
+        context.batchesCompleted.inc(1);
+        context.recordsCompleted.inc(incoming.getRecordCount());
+        return !isLast;
+      default:
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    @Override
+    public void stop() {
+      incoming.kill();
+    }
+
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
new file mode 100644
index 0000000..60c2d78
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -0,0 +1,89 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.SingleSender;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.FragmentWritableBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+public class SingleSenderCreator implements RootCreator<SingleSender>{
+
+  @Override
+  public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    assert children != null && children.size() == 1;
+    return new SingleSenderRootExec(context, children.iterator().next(), config);
+  }
+  
+  
+  private static class SingleSenderRootExec implements RootExec{
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
+    private RecordBatch incoming;
+    private BitTunnel tunnel;
+    private FragmentHandle handle;
+    private int recMajor;
+    private FragmentContext context;
+    
+    public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
+      logger.debug("Creating single sender root exec base on config: {}", config);
+      this.incoming = batch;
+      this.handle = context.getHandle();
+      this.recMajor = config.getOppositeMajorFragmentId();
+      this.tunnel = context.getCommunicator().getTunnel(config.getDestination());
+      this.context = context;
+    }
+    
+    @Override
+    public boolean next() {
+      IterOutcome out = incoming.next();
+      logger.debug("Outcome of sender next {}", out);
+      switch(out){
+      case STOP:
+      case NONE:
+        FragmentWritableBatch b2 = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        tunnel.sendRecordBatch(context, b2);
+        return false;
+        
+
+      case OK:
+      case OK_NEW_SCHEMA:
+        FragmentWritableBatch batch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
+        tunnel.sendRecordBatch(context, batch);
+        return true;
+
+      case NOT_YET:
+      default:
+        throw new IllegalStateException();
+      }
+    }
+
+    @Override
+    public void stop() {
+    }
+    
+    
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
new file mode 100644
index 0000000..fc7f833
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -0,0 +1,99 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.record.RawFragmentBatchProvider;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public class WireRecordBatch implements RecordBatch{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
+
+  private RecordBatchLoader batchLoader;
+  private RawFragmentBatchProvider fragProvider;
+  private FragmentContext context;
+
+  
+  public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
+    this.fragProvider = fragProvider;
+    this.context = context;
+    this.batchLoader = new RecordBatchLoader(context.getAllocator());
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return null;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return batchLoader.getRecordCount();
+  }
+
+  @Override
+  public void kill() {
+    fragProvider.kill(context);
+  }
+
+  @Override
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+    return batchLoader.getValueVector(fieldId, clazz);
+  }
+
+  @Override
+  public IterOutcome next() {
+    RawFragmentBatch batch = this.fragProvider.getNext();
+    try{
+      if(batch == null) return IterOutcome.NONE;
+
+      RecordBatchDef rbd = batch.getHeader().getDef();
+      boolean schemaChanged = batchLoader.load(rbd, batch.getBody());
+      if(schemaChanged){
+        return IterOutcome.OK_NEW_SCHEMA;
+      }else{
+        return IterOutcome.OK;
+      }
+    }catch(SchemaChangeException ex){
+      context.fail(ex);
+      return IterOutcome.STOP;
+    }
+  }
+
+  @Override
+  public WritableBatch getWritableBatch() {
+    return batchLoader.getWritableBatch();
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
new file mode 100644
index 0000000..187e6e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.materialize;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.WritableBatch;
+
+public class QueryWritableBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
+  
+  private final QueryResult header;
+  private final ByteBuf[] buffers;
+  
+  
+  public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
+    super();
+    this.header = header;
+    this.buffers = buffers;
+  }
+
+  public ByteBuf[] getBuffers(){
+    return buffers;
+  }
+
+  public QueryResult getHeader() {
+    return header;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
new file mode 100644
index 0000000..17c65e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/RecordMaterializer.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.materialize;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+
+public interface RecordMaterializer {
+  
+  public QueryWritableBatch convertNext(boolean isLast);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
new file mode 100644
index 0000000..e2d2eb9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl.materialize;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.WritableBatch;
+
+public class VectorRecordMaterializer implements RecordMaterializer{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorRecordMaterializer.class);
+
+  private QueryId queryId;
+  private RecordBatch batch;
+
+  public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
+    this.queryId = context.getHandle().getQueryId();
+    this.batch = batch;
+
+    for (MaterializedField f : batch.getSchema()) {
+      logger.debug("New Field: {}", f);
+    }
+  }
+
+  public QueryWritableBatch convertNext(boolean isLast) {
+    WritableBatch w = batch.getWritableBatch();
+
+    QueryResult header = QueryResult.newBuilder() //
+        .setQueryId(queryId) //
+        .setRowCount(batch.getRecordCount()) //
+        .setDef(w.getDef()).setIsLastChunk(isLast).build();
+    QueryWritableBatch batch = new QueryWritableBatch(header, w.getBuffers());
+    return batch;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
new file mode 100644
index 0000000..9b2cb85
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractOpWrapperVisitor.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+
+public abstract class AbstractOpWrapperVisitor<RET, EXCEP extends Throwable> extends
+    AbstractPhysicalVisitor<RET, Wrapper, EXCEP> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractOpWrapperVisitor.class);
+
+  @Override
+  public final RET visitExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
+    if (wrapper.getNode().getSendingExchange() == exchange) {
+      return visitSendingExchange(exchange, wrapper);
+    } else {
+      return visitReceivingExchange(exchange, wrapper);
+    }
+  }
+
+  public RET visitSendingExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
+    return visitOp(exchange, wrapper);
+  }
+
+  public RET visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws EXCEP {
+    return visitOp(exchange, wrapper);
+  }
+
+}