You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:38 UTC

[17/53] [abbrv] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
index 85c573d..75dce2c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
@@ -23,16 +23,19 @@ import static com.google.common.collect.Collections2.transform;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Function;
 import com.netflix.curator.RetryPolicy;
 import com.netflix.curator.framework.CuratorFramework;
 import com.netflix.curator.framework.CuratorFrameworkFactory;
 import com.netflix.curator.framework.state.ConnectionState;
+import com.netflix.curator.framework.state.ConnectionStateListener;
 import com.netflix.curator.retry.RetryNTimes;
 import com.netflix.curator.x.discovery.ServiceDiscovery;
 import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
@@ -52,6 +55,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
   private ServiceCache<DrillbitEndpoint> serviceCache;
   private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList();
   private final String serviceName;
+  private final CountDownLatch initialConnection = new CountDownLatch(1);
 
   public ZKClusterCoordinator(DrillConfig config) throws IOException {
 
@@ -64,6 +68,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
       .retryPolicy(rp)
       .connectString(config.getString(ExecConstants.ZK_CONNECTION))
       .build();
+    curator.getConnectionStateListenable().addListener(new InitialConnectionListener());
     discovery = getDiscovery();
     serviceCache = discovery.
       serviceCacheBuilder()
@@ -72,15 +77,36 @@ public class ZKClusterCoordinator extends ClusterCoordinator {
       .build();
   }
 
-  public void start() throws Exception {
+  public void start(long millisToWait) throws Exception {
     logger.debug("Starting ZKClusterCoordination.");
     curator.start();
     discovery.start();
     serviceCache.start();
     serviceCache.addListener(new ZKListener());
+    
+    if(millisToWait != 0){
+      boolean success = this.initialConnection.await(millisToWait, TimeUnit.MILLISECONDS);
+      if(!success) throw new IOException(String.format("Failure to connect to the zookeeper cluster service within the allotted time of %d milliseconds.", millisToWait));
+    }else{
+      this.initialConnection.await();
+    }
+    
+    
     updateEndpoints();
   }
+  
+  private class InitialConnectionListener implements ConnectionStateListener{
 
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+      if(newState == ConnectionState.CONNECTED){
+        ZKClusterCoordinator.this.initialConnection.countDown();
+        client.getConnectionStateListenable().removeListener(this);
+      }
+    }
+    
+  }
+  
   private class ZKListener implements ServiceCacheListener {
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java
new file mode 100644
index 0000000..9c18e51
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/BitComException.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class BitComException extends DrillException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComException.class);
+
+  public BitComException() {
+    super();
+  }
+
+  public BitComException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public BitComException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public BitComException(String message) {
+    super(message);
+  }
+
+  public BitComException(Throwable cause) {
+    super(cause);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
deleted file mode 100644
index a4899bd..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.exception;
-
-import org.apache.drill.common.exceptions.DrillException;
-
-public class ExecutionSetupException extends DrillException{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionSetupException.class);
-  
-  public ExecutionSetupException() {
-    super();
-  }
-
-  public ExecutionSetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
-    super(message, cause, enableSuppression, writableStackTrace);
-  }
-
-  public ExecutionSetupException(String message, Throwable cause) {
-    super(message, cause);
-  }
-
-  public ExecutionSetupException(String message) {
-    super(message);
-  }
-
-  public ExecutionSetupException(Throwable cause) {
-    super(cause);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
index c273463..dbd66b9 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FragmentSetupException.java
@@ -17,6 +17,8 @@
  ******************************************************************************/
 package org.apache.drill.exec.exception;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+
 public class FragmentSetupException extends ExecutionSetupException{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentSetupException.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java
deleted file mode 100644
index 30e7a63..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/CancelableQuery.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-public interface CancelableQuery {
-  public boolean cancel(long queryid);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java
deleted file mode 100644
index 4e4ec77..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ExecutionPlanner.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-public class ExecutionPlanner {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionPlanner.class);
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java
deleted file mode 100644
index f138171..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/Foreman.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-
-public class Foreman extends Thread{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-  
-  public Foreman(){
-    
-  }
-  
-  public void doWork(QueryWorkUnit work){
-    // generate fragment structure. 
-    // store fragments in distributed grid.
-    // generate any codegen required and store in grid.
-    // drop 
-    // do get on the result set you're looking for.  Do the initial get on the result node you're looking for.  This will return either data or a metadata record set
-  }
-
-  public boolean checkStatus(long queryId){
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java
deleted file mode 100644
index bdf4a1e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/QueryWorkUnit.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-
-import com.google.common.base.Preconditions;
-
-public class QueryWorkUnit {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
-  
-  private PlanFragment rootFragment; // for local
-  private List<PlanFragment> fragments;
-  
-  public QueryWorkUnit(PlanFragment rootFragment, List<PlanFragment> fragments) {
-    super();
-    Preconditions.checkNotNull(rootFragment);
-    Preconditions.checkNotNull(fragments);
-    this.rootFragment = rootFragment;
-    this.fragments = fragments;
-  }
-
-  public PlanFragment getRootFragment() {
-    return rootFragment;
-  }
-
-  public List<PlanFragment> getFragments() {
-    return fragments;
-  }
-  
-  
-  
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java
deleted file mode 100644
index 96d7d1e..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/ResourceRequest.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-public class ResourceRequest {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceRequest.class);
-  
-  public long memoryMin;
-  public long memoryDesired;
-  
-
-  public static class ResourceAllocation {
-    public long memory;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java
deleted file mode 100644
index fee6172..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/foreman/StatusProvider.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.foreman;
-
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-
-public interface StatusProvider {
-  public FragmentStatus getStatus();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
index 2b3f574..6bddab7 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java
@@ -19,11 +19,10 @@ package org.apache.drill.exec.memory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
 
 import java.io.Closeable;
 
-import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.common.config.DrillConfig;
 
 /**
  * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.  Also allows inser 
@@ -48,7 +47,7 @@ public abstract class BufferAllocator implements Closeable{
   @Override
   public abstract void close(); 
   
-  public static BufferAllocator getAllocator(DrillbitContext context){
+  public static BufferAllocator getAllocator(DrillConfig config){
     // TODO: support alternative allocators (including a debugging allocator that records all allocation locations for each buffer).
     return new DirectBufferAllocator();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
index 6b89c12..1f47041 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/metrics/SingleThreadNestedCounter.java
@@ -22,34 +22,34 @@ import org.apache.drill.exec.server.DrillbitContext;
 import com.yammer.metrics.Counter;
 
 /**
- * Wraps a parent counter so that local in thread metrics can be collected while collecting for a global counter.
+ * Wraps a parent counter so that local in-thread metrics can be collected while collecting for a global counter. Note
+ * that this one writer, many reader safe.
  */
 public class SingleThreadNestedCounter {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleThreadNestedCounter.class);
-  
+
   private volatile long count;
   private final Counter counter;
-  
-  
+
   public SingleThreadNestedCounter(DrillbitContext context, String name) {
     super();
     this.counter = context.getMetrics().counter(name);
   }
 
-  public long inc(long n){
+  public long inc(long n) {
     counter.inc(n);
-    count+= n;
+    count += n;
     return count;
   }
-  
-  public long dec(long n){
+
+  public long dec(long n) {
     counter.dec(n);
     count -= n;
     return count;
   }
-  
-  public long get(){
+
+  public long get() {
     return count;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java
deleted file mode 100644
index f626cea..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FilteringRecordBatchTransformer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.vector.SelectionVector;
-
-public abstract class FilteringRecordBatchTransformer {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilteringRecordBatchTransformer.class);
-  
-  final RecordBatch incoming;
-  final SelectionVector selectionVector;
-  final BatchSchema schema;
-  
-  public FilteringRecordBatchTransformer(RecordBatch incoming, OutputMutator output, SelectionVector selectionVector) {
-    super();
-    this.incoming = incoming;
-    this.selectionVector = selectionVector;
-    this.schema = innerSetup();
-  }
-
-  public abstract BatchSchema innerSetup();
-  
-  /**
-   * Applies the filter to the selection index.  Ignores any values in the selection vector, instead creating a.
-   * @return
-   */
-  public abstract int apply();
-  
-  /**
-   * Applies the filter to the selection index.  Utilizes the existing selection index and only evaluates on those records.
-   * @return
-   */
-  public abstract int applyWithSelection();
-
-  public BatchSchema getSchema() {
-    return schema;
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 0cf17e9..e64453c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -20,10 +20,15 @@ package org.apache.drill.exec.ops;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.SingleThreadNestedCounter;
-import org.apache.drill.exec.planner.FragmentRunnable;
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.physical.impl.FilteringRecordBatchTransformer;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
 import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 import com.yammer.metrics.MetricRegistry;
 import com.yammer.metrics.Timer;
@@ -34,51 +39,72 @@ import com.yammer.metrics.Timer;
 public class FragmentContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
 
-  private final static String METRIC_TIMER_FRAGMENT_TIME = MetricRegistry.name(FragmentRunnable.class, "completionTimes");
-  private final static String METRIC_BATCHES_COMPLETED = MetricRegistry.name(FragmentRunnable.class, "batchesCompleted");
-  private final static String METRIC_RECORDS_COMPLETED = MetricRegistry.name(FragmentRunnable.class, "recordsCompleted");
-  private final static String METRIC_DATA_PROCESSED = MetricRegistry.name(FragmentRunnable.class, "dataProcessed");
+  private final static String METRIC_TIMER_FRAGMENT_TIME = MetricRegistry.name(FragmentRunner.class, "completionTimes");
+  private final static String METRIC_BATCHES_COMPLETED = MetricRegistry.name(FragmentRunner.class, "batchesCompleted");
+  private final static String METRIC_RECORDS_COMPLETED = MetricRegistry.name(FragmentRunner.class, "recordsCompleted");
+  private final static String METRIC_DATA_PROCESSED = MetricRegistry.name(FragmentRunner.class, "dataProcessed");
 
   private final DrillbitContext context;
-  private final PlanFragment fragment;
   public final SingleThreadNestedCounter batchesCompleted;
   public final SingleThreadNestedCounter recordsCompleted;
   public final SingleThreadNestedCounter dataProcessed;
   public final Timer fragmentTime;
+  private final FragmentHandle handle;
+  private final UserClientConnection connection;
+  private final IncomingBuffers buffers;
 
-  public FragmentContext(DrillbitContext dbContext, PlanFragment fragment) {
+  public FragmentContext(DrillbitContext dbContext, FragmentHandle handle, UserClientConnection connection, IncomingBuffers buffers) {
     this.fragmentTime = dbContext.getMetrics().timer(METRIC_TIMER_FRAGMENT_TIME);
     this.batchesCompleted = new SingleThreadNestedCounter(dbContext, METRIC_BATCHES_COMPLETED);
     this.recordsCompleted = new SingleThreadNestedCounter(dbContext, METRIC_RECORDS_COMPLETED);
     this.dataProcessed = new SingleThreadNestedCounter(dbContext, METRIC_DATA_PROCESSED);
     this.context = dbContext;
-    this.fragment = fragment;
+    this.connection = connection;
+    this.handle = handle;
+    this.buffers = buffers;
   }
 
   public void fail(Throwable cause) {
 
   }
 
+  
   public DrillbitContext getDrillbitContext(){
     return context;
   }
-  
-  public PlanFragment getFragment() {
-    return fragment;
+
+  public DrillbitEndpoint getIdentity(){
+    return context.getEndpoint();
   }
   
+  public FragmentHandle getHandle() {
+    return handle;
+  }
+
   public BufferAllocator getAllocator(){
     // TODO: A local query allocator to ensure memory limits and accurately gauge memory usage.
     return context.getAllocator();
   }
 
-  
   public FilteringRecordBatchTransformer getFilteringExpression(LogicalExpression expr){
     return null;
   }
   
+  public void addMetricsToStatus(FragmentStatus.Builder stats){
+    stats.setBatchesCompleted(batchesCompleted.get());
+    stats.setDataProcessed(dataProcessed.get());
+    stats.setRecordsCompleted(recordsCompleted.get());
+  }
   
+  public UserClientConnection getConnection() {
+    return connection;
+  }
+
   public BitCom getCommunicator(){
-    return null;
+    return context.getBitCom();
+  }
+  
+  public IncomingBuffers getBuffers(){
+    return buffers;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java
deleted file mode 100644
index 3c75648..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentConverter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
-
-public class FragmentConverter {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentConverter.class);
-  
-  public static FragmentRoot getFragment(FragmentContext context){
-    PlanFragment m = context.getFragment();
-    
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java
deleted file mode 100644
index ddacb41..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentRoot.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-
-/**
- * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
- * output nodes and storage nodes.  They are there driving force behind the completion of a query.
- */
-public interface FragmentRoot {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentRoot.class);
-  
-  /**
-   * Do the next batch of work.  
-   * @return Whether or not additional batches of work are necessary.
-   */
-  public boolean next();
-  
-  
-  public void setup() throws FragmentSetupException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java
deleted file mode 100644
index 8d4e807..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorFactory.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-public class OperatorFactory {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorFactory.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
deleted file mode 100644
index 59abdc4..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.vector.ValueVector;
-
-public interface OutputMutator {
-  public void removeField(int fieldId) throws SchemaChangeException;
-  public void addField(int fieldId, ValueVector<?> vector) throws SchemaChangeException ;
-  public void setNewSchema(BatchSchema schema) throws SchemaChangeException ;
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fe37e70..fd24deb 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -19,33 +19,42 @@ package org.apache.drill.exec.ops;
 
 import java.util.Collection;
 
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.server.DrillbitContext;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 public class QueryContext {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
   
-  private long queryId;
+  private QueryId queryId;
   private DrillbitContext drillbitContext;
   
-  public QueryContext(long queryId, DrillbitContext drllbitContext) {
+  public QueryContext(QueryId queryId, DrillbitContext drllbitContext) {
     super();
     this.queryId = queryId;
     this.drillbitContext = drllbitContext;
   }
   
-  public long getQueryId() {
-    return queryId;
+  public DrillbitEndpoint getCurrentEndpoint(){
+    return drillbitContext.getEndpoint();
   }
   
-  public ObjectMapper getMapper(){
-    return drillbitContext.getConfig().getMapper();
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public DistributedCache getCache(){
+    return drillbitContext.getCache();
   }
   
   public Collection<DrillbitEndpoint> getActiveEndpoints(){
     return drillbitContext.getBits();
   }
   
+  public PhysicalPlanReader getPlanReader(){
+    return drillbitContext.getPlanReader();
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
deleted file mode 100644
index b46804f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-import java.util.Iterator;
-
-import org.apache.drill.exec.exception.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.vector.ValueVector;
-import org.apache.drill.exec.store.RecordReader;
-
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.carrotsearch.hppc.procedures.IntObjectProcedure;
-
-/**
- * Record batch used for a particular scan. Operators against one or more
- */
-public abstract class ScanBatch implements RecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
-
-  private IntObjectOpenHashMap<ValueVector<?>> fields = new IntObjectOpenHashMap<ValueVector<?>>();
-  private BatchSchema schema;
-  private int recordCount;
-  private boolean schemaChanged = true;
-  private final FragmentContext context;
-  private Iterator<RecordReader> readers;
-  private RecordReader currentReader;
-  private final BatchSchema expectedSchema;
-  private final Mutator mutator = new Mutator();
-
-  public ScanBatch(BatchSchema expectedSchema, Iterator<RecordReader> readers, FragmentContext context)
-      throws ExecutionSetupException {
-    this.expectedSchema = expectedSchema;
-    this.context = context;
-    this.readers = readers;
-    if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader.");
-    this.currentReader = readers.next();
-    this.currentReader.setup(expectedSchema, mutator);
-  }
-
-  private void schemaChanged() {
-    schema = null;
-    schemaChanged = true;
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return context;
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return recordCount;
-  }
-
-  @Override
-  public void kill() {
-    releaseAssets();
-  }
-
-  private void releaseAssets() {
-    fields.forEach(new IntObjectProcedure<ValueVector<?>>() {
-      @Override
-      public void apply(int key, ValueVector<?> value) {
-        value.close();
-      }
-    });
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
-    if (fields.containsKey(fieldId))
-      throw new InvalidValueAccessor(String.format("Unknown value accesor for field id %d."));
-    ValueVector<?> vector = this.fields.lget();
-    if (vector.getClass().isAssignableFrom(clazz)) {
-      return (T) vector;
-    } else {
-      throw new InvalidValueAccessor(String.format(
-          "You requested a field accessor of type %s for field id %d but the actual type was %s.",
-          clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
-    }
-  }
-
-  @Override
-  public IterOutcome next() {
-    while ((recordCount = currentReader.next()) == 0) {
-      try {
-        if (!readers.hasNext()) {
-          currentReader.cleanup();
-          releaseAssets();
-          return IterOutcome.NONE;
-        }
-        currentReader.cleanup();
-        currentReader = readers.next();
-        currentReader.setup(expectedSchema, mutator);
-      } catch (ExecutionSetupException e) {
-        this.context.fail(e);
-        releaseAssets();
-        return IterOutcome.STOP;
-      }
-    }
-
-    if (schemaChanged) {
-      schemaChanged = false;
-      return IterOutcome.OK_NEW_SCHEMA;
-    } else {
-      return IterOutcome.OK;
-    }
-  }
-
-  private class Mutator implements OutputMutator {
-
-    public void removeField(int fieldId) throws SchemaChangeException {
-      schemaChanged();
-      ValueVector<?> v = fields.remove(fieldId);
-      if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
-      v.close();
-    }
-
-    public void addField(int fieldId, ValueVector<?> vector) {
-      schemaChanged();
-      ValueVector<?> v = fields.put(fieldId, vector);
-      if (v != null) v.close();
-    }
-
-    @Override
-    public void setNewSchema(BatchSchema schema) {
-      ScanBatch.this.schema = schema;
-      ScanBatch.this.schemaChanged = true;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
deleted file mode 100644
index 0fc7a1f..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/StreamingRecordBatch.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops;
-
-/**
- * Works on one incoming batch at a time.  Creates one output batch for each input batch.
- */
-public class StreamingRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingRecordBatch.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
deleted file mode 100644
index 07d7099..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/ExchangeRecordBatch.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-public class ExchangeRecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExchangeRecordBatch.class);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
deleted file mode 100644
index 0e35932..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.exchange;
-
-public class RecordBatchSender {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSender.class);
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
deleted file mode 100644
index 5bef612..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/FilterRecordBatch.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.filter;
-
-import org.apache.drill.exec.ops.FilteringRecordBatchTransformer;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.InvalidValueAccessor;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.vector.SelectionVector;
-import org.apache.drill.exec.record.vector.ValueVector;
-
-public abstract class FilterRecordBatch implements RecordBatch {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
-
-  private RecordBatch incoming;
-  private SelectionVector selectionVector;
-  private BatchSchema schema;
-  private FilteringRecordBatchTransformer transformer;
-  private int outstanding;
-
-  public FilterRecordBatch(RecordBatch batch) {
-    this.incoming = batch;
-  }
-
-  @Override
-  public FragmentContext getContext() {
-    return incoming.getContext();
-  }
-
-  @Override
-  public BatchSchema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public int getRecordCount() {
-    return 0;
-  }
-
-  @Override
-  public void kill() {
-    incoming.kill();
-  }
-
-  @Override
-  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
-    return null;
-  }
-
-  abstract int applyFilter(SelectionVector vector, int count);
-
-  /**
-   * Release all assets.
-   */
-  private void close() {
-
-  }
-
-  @Override
-  public IterOutcome next() {
-    while (true) {
-      IterOutcome o = incoming.next();
-      switch (o) {
-      case OK_NEW_SCHEMA:
-        transformer = incoming.getContext().getFilteringExpression(null);
-        schema = transformer.getSchema();
-        // fall through to ok.
-      case OK:
-
-      case NONE:
-      case STOP:
-        close();
-        return IterOutcome.STOP;
-      }
-
-      if (outstanding > 0) {
-        // move data to output location.
-
-        for (int i = incoming.getRecordCount() - outstanding; i < incoming.getRecordCount(); i++) {
-
-        }
-      }
-
-      // make sure the bit vector is as large as the current record batch.
-      if (selectionVector.size() < incoming.getRecordCount()) {
-        selectionVector.allocateNew(incoming.getRecordCount());
-      }
-
-      return null;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
deleted file mode 100644
index 218a19a..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/filter/SelectionVectorUpdater.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.ops.filter;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.DirectBufferAllocator;
-import org.apache.drill.exec.record.vector.NullableInt32Vector;
-import org.apache.drill.exec.record.vector.UInt16Vector;
-import org.codehaus.janino.ExpressionEvaluator;
-
-public class SelectionVectorUpdater {
-  //static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVectorUpdater.class);
-
-  // Add a selection vector to a record batch.
-  /**
-   * where a + b < 10
-   */
-
-  public static int applyToBatch(final int recordCount, final NullableInt32Vector a, final NullableInt32Vector b,
-      final UInt16Vector selectionVector) {
-    int selectionIndex = 0;
-    for (int i = 0; i < recordCount; i++) {
-      int isNotNull = a.isNull(i) * b.isNull(i);
-      if (isNotNull > 0 && a.get(i) + b.get(i) < 10) {
-        selectionVector.set(selectionIndex, (char) i);
-        selectionIndex++;
-      }
-    }
-    return selectionIndex;
-  }
-
-  public static void mai2n(String[] args) {
-    int size = 1024;
-    BufferAllocator allocator = new DirectBufferAllocator();
-    NullableInt32Vector a = new NullableInt32Vector(0, allocator);
-    NullableInt32Vector b = new NullableInt32Vector(1, allocator);
-    UInt16Vector select = new UInt16Vector(2, allocator);
-    a.allocateNew(size);
-    b.allocateNew(size);
-    select.allocateNew(size);
-    int r = 0;
-    for (int i = 0; i < 1500; i++) {
-      r += applyToBatch(size, a, b, select);
-    }
-
-    System.out.println(r);
-  }
-  
-public static void main(String[] args) throws Exception{
-  ExpressionEvaluator ee = new ExpressionEvaluator(
-      "c > d ? c : d",                     // expression
-      int.class,                           // expressionType
-      new String[] { "c", "d" },           // parameterNames
-      new Class[] { int.class, int.class } // parameterTypes
-  );
-  
-  Integer res = (Integer) ee.evaluate(
-      new Object[] {          // parameterValues
-          new Integer(10),
-          new Integer(11),
-      }
-  );
-  System.out.println("res = " + res);
-}
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
index 70a42be..d2aaca3 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
@@ -19,8 +19,7 @@ package org.apache.drill.exec.opt;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.logical.LogicalPlan;
-import org.apache.drill.common.optimize.Optimizer;
-import org.apache.drill.common.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.PhysicalPlan;
 
 public class IdentityOptimizer extends Optimizer {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
new file mode 100644
index 0000000..9f506c1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.opt;
+
+import java.io.Closeable;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+public abstract class Optimizer implements Closeable{
+  
+  public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
+  
+  public abstract void init(DrillConfig config);
+  
+  public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan);
+  public abstract void close();
+  
+  public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
+    Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
+    o.init(config);
+    return o;
+  }
+  
+  public interface OptimizationContext{
+    public int getPriority();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java
new file mode 100644
index 0000000..334119d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/DataValidationMode.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+public enum DataValidationMode {
+  TERMINATE, // terminate the query if the data doesn't match expected.
+  DROP_RECORD, // drop the record that doesn't match the expected situation.
+  SINK_RECORD // record the failed record along with the rule violation in a secondary location.
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
new file mode 100644
index 0000000..d7b21db
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+
+public class EndpointAffinity implements Comparable<EndpointAffinity>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class);
+  
+  private DrillbitEndpoint endpoint;
+  private float affinity = 0.0f;
+  
+  public EndpointAffinity(DrillbitEndpoint endpoint) {
+    super();
+    this.endpoint = endpoint;
+  }
+  
+  public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) {
+    super();
+    this.endpoint = endpoint;
+    this.affinity = affinity;
+  }
+
+  public DrillbitEndpoint getEndpoint() {
+    return endpoint;
+  }
+  public void setEndpoint(DrillbitEndpoint endpoint) {
+    this.endpoint = endpoint;
+  }
+  public float getAffinity() {
+    return affinity;
+  }
+  
+  @Override
+  public int compareTo(EndpointAffinity o) {
+    return Float.compare(affinity, o.affinity);
+  }
+  
+  public void addAffinity(float f){
+    affinity += f;
+  }
+  
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java
new file mode 100644
index 0000000..ebe6446
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/OperatorCost.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class OperatorCost {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorCost.class);
+  
+  private final float network; 
+  private final float disk;
+  private final float memory;
+  private final float cpu;
+  
+  
+  
+  @JsonCreator
+  public OperatorCost(@JsonProperty("network") float network, @JsonProperty("disk") float disk, @JsonProperty("memory") float memory, @JsonProperty("cpu") float cpu) {
+    super();
+    this.network = network;
+    this.disk = disk;
+    this.memory = memory;
+    this.cpu = cpu;
+  }
+
+  public float getNetwork() {
+    return network;
+  }
+
+  public float getDisk() {
+    return disk;
+  }
+
+  public float getMemory() {
+    return memory;
+  }
+
+  public float getCpu() {
+    return cpu;
+  }
+  
+  public static OperatorCost combine(OperatorCost c1, OperatorCost c2){
+    return new OperatorCost(c1.network + c2.network, c1.disk + c2.disk, c1.memory + c2.memory, c1.cpu + c2.cpu);
+  }
+
+  public OperatorCost add(OperatorCost c2){
+    return combine(this, c2);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
new file mode 100644
index 0000000..84bfc87
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -0,0 +1,94 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.PlanProperties;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.graph.Graph;
+import org.apache.drill.common.graph.GraphAlgos;
+import org.apache.drill.exec.physical.base.Leaf;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Root;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Lists;
+
+@JsonPropertyOrder({ "head", "graph" })
+public class PhysicalPlan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlan.class);
+  
+  PlanProperties properties;
+  
+  Graph<PhysicalOperator, Root, Leaf> graph;
+  
+  @JsonCreator
+  public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
+    this.properties = properties;
+    this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
+  }
+  
+  @JsonProperty("graph")
+  public List<PhysicalOperator> getSortedOperators(){
+    // reverse the list so that nested references are flattened rather than nested.
+    return getSortedOperators(true);
+  }
+  
+  public List<PhysicalOperator> getSortedOperators(boolean reverse){
+    List<PhysicalOperator> list = GraphAlgos.TopoSorter.sort(graph);
+    if(reverse){
+      return Lists.reverse(list);
+    }else{
+      return list;
+    }
+    
+  }
+
+
+  @JsonProperty("head")
+  public PlanProperties getProperties() {
+    return properties;
+  }
+
+  /** Parses a physical plan. */
+  public static PhysicalPlan parse(ObjectReader reader, String planString) {
+    try {
+      PhysicalPlan plan = reader.readValue(planString);
+      return plan;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /** Converts a physical plan to a string. (Opposite of {@link #parse}.) */
+  public String unparse(ObjectWriter writer) {
+    try {
+      return writer.writeValueAsString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java
new file mode 100644
index 0000000..02fe025
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/ReadEntry.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Describes a chunk of read work that will be done.
+ */
+public interface ReadEntry {
+  @JsonIgnore
+  public OperatorCost getCost();
+
+  @JsonIgnore
+  public Size getSize();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java
new file mode 100644
index 0000000..db3390a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/RecordField.java
@@ -0,0 +1,60 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+import org.apache.drill.common.expression.types.DataType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RecordField {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordField.class);
+
+  
+  private DataType type;
+  private ValueMode mode;
+  
+  @JsonCreator
+  public RecordField(@JsonProperty("type") DataType type, @JsonProperty("mode") ValueMode mode) {
+    super();
+    this.type = type;
+    this.mode = mode;
+  }
+
+  public DataType getType() {
+    return type;
+  }
+
+  public ValueMode getMode() {
+    return mode;
+  }
+  
+  public static enum ValueMode {
+    VECTOR,
+    DICT,
+    RLE
+  }
+  
+  public static enum ValueType {
+    OPTIONAL,
+    REQUIRED, 
+    REPEATED
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java
new file mode 100644
index 0000000..96bd996
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/WriteEntry.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical;
+
+public interface WriteEntry {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WriteEntry.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
new file mode 100644
index 0000000..e91257e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.base;
+
+import org.apache.drill.common.graph.GraphVisitor;
+import org.apache.drill.exec.physical.OperatorCost;
+
+public abstract class AbstractBase implements PhysicalOperator{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
+
+
+
+  @Override
+  public void accept(GraphVisitor<PhysicalOperator> visitor) {
+    visitor.enter(this);
+    if(this.iterator() == null) throw new IllegalArgumentException("Null iterator for pop." + this);
+    for(PhysicalOperator o : this){
+      o.accept(visitor);  
+    }
+    visitor.leave(this);
+  }
+  
+  @Override
+  public boolean isExecutable() {
+    return true;
+  }
+  
+}