You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/07/20 03:57:29 UTC

[08/53] [abbrv] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
new file mode 100644
index 0000000..3f710ed
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/LocalFragmentHandler.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.fragment;
+
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+public class LocalFragmentHandler implements IncomingFragmentHandler{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalFragmentHandler.class);
+
+  private final IncomingBuffers buffers;
+  private final FragmentRunner runner;
+  private final FragmentHandle handle;
+  private volatile boolean cancel = false;
+  
+  public LocalFragmentHandler(FragmentHandle handle, IncomingBuffers buffers, FragmentRunner runner) {
+    super();
+    this.handle = handle;
+    this.buffers = buffers;
+    this.runner = runner;
+  }
+
+  @Override
+  public boolean handle(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException {
+    return buffers.batchArrived(throttle, batch);
+  }
+
+  @Override
+  public FragmentRunner getRunnable() {
+    return runner;
+  }
+
+  
+  public FragmentHandle getHandle() {
+    return handle;
+  }
+
+  @Override
+  public void cancel() {
+    cancel = true;
+  }
+
+  @Override
+  public boolean isDone() {
+    return cancel || isDone();
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
new file mode 100644
index 0000000..70d7e93
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -0,0 +1,123 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.fragment;
+
+import java.io.IOException;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentLeaf;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.FragmentRunnerListener;
+import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.batch.IncomingBuffers;
+
+/**
+ * This handler receives all incoming traffic for a particular FragmentHandle.  It will monitor the state of incoming batches
+ */
+public class RemoteFragmentHandler implements IncomingFragmentHandler {
+  private final PlanFragment fragment;
+  private FragmentLeaf root;
+  private final IncomingBuffers buffers;
+  private final FragmentRunnerListener runnerListener;
+  private volatile FragmentRunner runner;
+  private volatile boolean cancel = false;
+  private final FragmentContext context;
+  private final PhysicalPlanReader reader;
+  
+  public RemoteFragmentHandler(PlanFragment fragment, DrillbitContext context, BitTunnel foremanTunnel) throws FragmentSetupException{
+    try{
+      this.fragment = fragment;
+      this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+      this.buffers = new IncomingBuffers(root);
+      this.context = new FragmentContext(context, fragment.getHandle(), null, buffers);
+      this.runnerListener = new RemotingFragmentRunnerListener(this.context, foremanTunnel);
+      this.reader = context.getPlanReader();
+      
+    }catch(IOException e){
+      throw new FragmentSetupException("Failure while decoding fragment.", e);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
+   */
+  @Override
+  public boolean handle(ConnectionThrottle throttle, RawFragmentBatch batch) throws FragmentSetupException {
+    return buffers.batchArrived(throttle, batch);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.fragment.FragmentHandler#getRunnable()
+   */
+  @Override
+  public FragmentRunner getRunnable(){
+    synchronized(this){
+      if(runner != null) throw new IllegalStateException("Get Runnable can only be run once.");
+      if(cancel) return null;
+      try {
+        FragmentRoot fragRoot = reader.readFragmentOperator(fragment.getFragmentJson());
+        RootExec exec = ImplCreator.getExec(context, fragRoot);
+        runner = new FragmentRunner(context, exec, runnerListener);
+        return this.runner;
+      } catch (IOException | ExecutionSetupException e) {
+        runnerListener.fail(fragment.getHandle(), "Failure while setting up remote fragment.", e);
+        return null;
+      }
+    }
+    
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.drill.exec.work.fragment.FragmentHandler#cancel()
+   */
+  @Override
+  public void cancel(){
+    synchronized(this){
+      cancel = true;
+      if(runner != null){
+        runner.cancel();
+      }
+    }
+  }
+
+  @Override
+  public FragmentHandle getHandle() {
+    return fragment.getHandle();
+  }
+
+  @Override
+  public boolean isDone() {
+    return cancel || buffers.isDone();
+  }
+  
+  
+
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
new file mode 100644
index 0000000..621c7cb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/UserWorker.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work.user;
+
+import java.util.UUID;
+
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.QueryResult.QueryState;
+import org.apache.drill.exec.proto.UserProtos.RequestResults;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.work.FragmentRunner;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+public class UserWorker{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserWorker.class);
+  
+  private final WorkerBee bee;
+    
+  public UserWorker(WorkerBee bee) {
+    super();
+    this.bee = bee;
+  }
+
+  public QueryId submitWork(UserClientConnection connection, RunQuery query){
+    UUID uuid = UUID.randomUUID();
+    QueryId id = QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
+    Foreman foreman = new Foreman(bee, bee.getContext(), connection, id, query);
+    bee.addNewForeman(foreman);
+    return id;
+  }
+  
+  public QueryResult getResult(UserClientConnection connection, RequestResults req){
+    Foreman foreman = bee.getForemanForQueryId(req.getQueryId());
+    if(foreman == null) return QueryResult.newBuilder().setQueryState(QueryState.UNKNOWN_QUERY).build();
+    return foreman.getResult(connection, req);
+  }
+
+  public Ack cancelQuery(QueryId query){
+    Foreman foreman = bee.getForemanForQueryId(query);
+    if(foreman != null){
+      foreman.cancel();
+    }
+    return Acks.OK;
+  }
+  
+  public Ack cancelFragment(FragmentHandle handle){
+    FragmentRunner runner = bee.getFragmentRunner(handle);
+    if(runner != null) runner.cancel();
+    return Acks.OK;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
new file mode 100644
index 0000000..5cc5cab
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/Coordination.proto
@@ -0,0 +1,26 @@
+package exec;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "CoordinationProtos";
+option optimize_for = SPEED;
+
+message DrillbitEndpoint{
+  optional string address = 1;
+  optional int32 user_port = 2;
+  optional int32 bit_port = 3;
+  optional Roles roles = 4;
+}
+
+message DrillServiceInstance{
+  optional string id = 1;
+  optional int64 registrationTimeUTC = 2;
+  optional DrillbitEndpoint endpoint = 3;
+}
+
+message Roles{
+	optional bool sql_query = 1 [default = true];
+	optional bool logical_plan = 2 [default = true];
+	optional bool physical_plan = 3 [default = true];
+	optional bool java_executor = 4 [default = true];
+	optional bool distributed_cache = 5 [default = true];
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
index 77a7ee1..7501d7c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/ExecutionProtos.proto
@@ -2,9 +2,11 @@ package exec.bit;
 
 option java_package = "org.apache.drill.exec.proto";
 option java_outer_classname = "ExecProtos";
-option optimize_for = LITE_RUNTIME;
-import "SchemaDef.proto";
+option optimize_for = SPEED;
+
 import "Coordination.proto";
+import "UserBitShared.proto";
+
 
 
 ////// UserToBit RPC ///////
@@ -16,7 +18,7 @@ enum RpcType {
     // bit requests
     REQ_INIATILIZE_FRAGMENT = 3; // Returns Handle
     REQ_RECORD_BATCH = 4; // send record batch overview, returns Ack
-    REQ_BATCH_CHUNK = 5; // send additional batch chunk, returns Ack.
+    
     REQ_CANCEL_FRAGMENT = 6; // send a cancellation message for a fragment, returns Ack
 	REQ_FRAGMENT_STATUS = 7; // get a fragment status, returns FragmentStatus
 	REQ_BIT_STATUS = 8; // get bit status.
@@ -25,34 +27,29 @@ enum RpcType {
     RESP_FRAGMENT_HANDLE = 9;
     RESP_FRAGMENT_STATUS = 10;
 	RESP_BIT_STATUS = 11;
-	RESP_BATCH_CHUNK = 12;
 }
 
 
 message BitHandshake{
-	optional DrillbitEndpoint endpoint = 1;
+	optional int32 rpc_version = 1;
+	optional DrillbitEndpoint endpoint = 2;
 }
 
 message BitBatchChunk {}
 
 message BitStatus {
-	repeated ActiveFragment fragment = 1;
-}
-
-message ActiveFragment {
-	optional FragmentStatus status = 1;
-	optional int64 fragment_id = 2;
-	optional int64 query_id = 3; 
+	repeated FragmentStatus fragment_status = 1;
 }
 
 message FragmentStatus {
 	
 	enum FragmentState {
-	  AWAITING_ALLOCATION = 0;
-	  RUNNING = 1;
-	  FINISHED = 2;
-	  CANCELLED = 3;
-	  FAILED = 4;
+	  SENDING = 0;
+	  AWAITING_ALLOCATION = 1;
+	  RUNNING = 2;
+	  FINISHED = 3;
+	  CANCELLED = 4;
+	  FAILED = 5;
 	}
 	
 	optional int64 memory_use = 1;
@@ -61,27 +58,37 @@ message FragmentStatus {
 	optional int32 estimated_completion_percentage = 4;
 	optional FragmentState state = 5;
 	optional int64 data_processed = 6;
+	
+	optional FragmentHandle handle = 7;
+	optional exec.shared.DrillPBError error = 8;
+	optional int64 running_time = 9;
 }
 
-message RecordBatchHeader {
+message FragmentRecordBatch{
+	optional FragmentHandle handle = 1;
+	optional int32 sending_major_fragment_id = 2;
+	optional int32 sending_minor_fragment_id = 3;
+	optional exec.shared.RecordBatchDef def = 4;
+	optional bool isLastBatch = 5;
 }
 
 message PlanFragment {
-	optional int64 query_id = 1;
-	optional int32 major_fragment_id = 2;
-	optional int32 minor_fragment_id = 3;
+	optional FragmentHandle handle = 1;
 	optional float network_cost = 4;
 	optional float cpu_cost = 5;
 	optional float disk_cost = 6;
 	optional float memory_cost = 7;
 	optional string fragment_json = 8;
-	optional bool self_driven = 9;
 	optional DrillbitEndpoint assignment = 10;
+	optional bool leaf_fragment = 9;
+	optional DrillbitEndpoint foreman = 11;
+
 }
 
 message FragmentHandle {
-	optional int32 major_fragment_id = 1;
-	optional int32 minor_fragment_id = 1;
+	optional exec.shared.QueryId query_id = 1;
+	optional int32 major_fragment_id = 2;
+	optional int32 minor_fragment_id = 3;
 }
 
 message WorkQueueStatus{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
index ebc7dca..48011bf 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/GeneralRPC.proto
@@ -2,7 +2,9 @@ package exec.rpc;
 
 option java_package = "org.apache.drill.exec.proto";
 option java_outer_classname = "GeneralRPCProtos";
-option optimize_for = LITE_RUNTIME;
+option optimize_for = SPEED;
+
+import "Coordination.proto";
 
 message Ack{
 	optional bool ok = 1;
@@ -33,3 +35,5 @@ message RpcFailure {
   optional string short_error = 3;
   optional string long_error = 4;
 }
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
index 6e983d4..de0009a 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/SchemaDef.proto
@@ -2,29 +2,62 @@ package exec;
 
 option java_package = "org.apache.drill.exec.proto";
 option java_outer_classname = "SchemaDefProtos";
-option optimize_for = LITE_RUNTIME;
+option optimize_for = SPEED;
 
 
 // Schema Definitions //
-enum DataType {
-  LATE = 0;
-  INT32 = 1;
-  INT64 = 2;
-  FLOAT32 = 3;
-  FLOAT64 = 4;
-  UTF8 = 5;
-  BYTES = 6;
+enum MinorType {
+    LATE = 0;   //  late binding type
+    MAP = 1;   //  an empty map column.  Useful for conceptual setup.  Children listed within here
+    REPEATMAP = 2;   //  a repeated map column (means that multiple children sit below this)
+    TINYINT = 3;   //  single byte signed integer
+    SMALLINT = 4;   //  two byte signed integer
+    INT = 5;   //  four byte signed integer
+    BIGINT = 6;   //  eight byte signed integer
+    DECIMAL4 = 7;   //  a decimal supporting precision between 1 and 8 (4 bits for decimal location, 1 sign)
+    DECIMAL8 = 8;   //  a decimal supporting precision between 9 and 18 (5 bits for decimal location, 1 sign)
+    DECIMAL12 = 9;   //  a decimal supporting precision between19 and 28 (5 bits for decimal location, 1 sign)
+    DECIMAL16 = 10;   //  a decimal supporting precision between 29 and 37 (6 bits for decimal location, 1 sign)
+    MONEY = 11;   //  signed decimal with two digit precision
+    DATE = 12;   //  days since 4713bc 
+    TIME = 13;   //  time in micros before or after 2000/1/1
+    TIMETZ = 14;   //  time in micros before or after 2000/1/1 with timezone
+    TIMESTAMP = 15;   //  unix epoch time in millis
+    DATETIME = 16;   //  TBD
+    INTERVAL = 17;   //  TBD
+    FLOAT4 = 18;   //  4 byte ieee 754 
+    FLOAT8 = 19;   //  8 byte ieee 754
+    BOOLEAN = 20;   //  single bit value
+    FIXEDCHAR = 21;   //  utf8 fixed length string, padded with spaces
+    VARCHAR1 = 22;   //  utf8 variable length string (up to 2^8 in length)
+    VARCHAR2 = 23;   //  utf8 variable length string (up to 2^16 in length)
+    VARCHAR4 = 24;   //  utf8 variable length string (up to 2^32 in length)
+    FIXEDBINARY = 25;   //  fixed length binary, padded with 0 bytes
+    VARBINARY1 = 26;   //  variable length binary (up to 2^8 in length)
+    VARBINARY2 = 27;   //  variable length binary (up to 2^16 in length)
+    VARBINARY4 = 28;   //  variable length binary (up to 2^32 in length)
+    UINT1 = 29;   //  unsigned 1 byte integer
+    UINT2 = 30;   //  unsigned 2 byte integer
+    UINT4 = 31;   //  unsigned 4 byte integer
+    UINT8 = 32;   //  unsigned 8 byte integer
+    PROTO2 = 33;   //  protobuf encoded complex type. (up to 2^16 in length)
+    PROTO4 = 34;   //  protobuf encoded complex type. (up to 2^32 in length)
+    MSGPACK2 = 35;   //  msgpack encoded complex type. (up to 2^16 in length)
+    MSGPACK4 = 36;   //  msgpack encoded complex type. (up to 2^32 in length)
 }
 
-enum DataMode {
-  REQUIRED = 0;
-  OPTIONAL = 1;
-  REPEATED = 2;
-  MAP = 3; 
+message MajorType {
+  optional MinorType minor_type = 1;
+  optional DataMode mode = 2;
+  optional int32 width = 3; // optional width for fixed size values.
+  optional int32 precision = 4; // used for decimal types
+  optional int32 scale = 5; // used for decimal types 
 }
 
-message SchemaDef {
-  repeated FieldDef field = 1;
+enum DataMode {
+  OPTIONAL = 0; // nullable
+  REQUIRED = 1; // non-nullable
+  REPEATED = 2; // single, repeated-field
 }
 
 enum ValueMode {
@@ -33,12 +66,21 @@ enum ValueMode {
 	DICT = 2;
 }
 
+message NamePart {
+  enum Type{
+    NAME = 0;
+    ARRAY = 1;
+  }
+  
+  optional Type type = 1;
+  optional string name = 2; // only required if this is a named type.
+}
+
 message FieldDef {
-  optional string name = 1;
-  optional DataMode data_mode = 2;
-  optional ValueMode value_mode = 3;
+  optional int32 field_id = 1;
+  optional int32 parent_id = 2; // the field_id of the parent of this field.  populated when this is a repeated field.  a field_id of 0 means that the record is the parent of this repeated field.
+  repeated NamePart name = 3; // multipart description of entire field name
+  optional MajorType major_type = 4; // the type associated with this field.
+  repeated FieldDef field = 5; // only in the cases of type == MAP or REPEATMAP
   
-  // If DataMode == 0-2, type should be populated and fields should be empty.  Otherwise, type should empty and fields should be defined. 
-  optional DataType type = 4;
-  repeated FieldDef fields = 5;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
index 225d1a0..cbf5b4c 100644
--- a/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/User.proto
@@ -2,8 +2,12 @@ package exec.user;
 
 option java_package = "org.apache.drill.exec.proto";
 option java_outer_classname = "UserProtos";
-option optimize_for = LITE_RUNTIME;
+option optimize_for = SPEED;
+
 import "SchemaDef.proto";
+import "UserBitShared.proto";
+
+
 
 ////// UserToBit RPC ///////
 enum RpcType {
@@ -13,7 +17,8 @@ enum RpcType {
     
     // user to bit
     RUN_QUERY = 3;
-    REQUEST_RESULTS = 4;
+    CANCEL_QUERY = 4;
+    REQUEST_RESULTS = 5;
     
     // bit to user
 	QUERY_RESULT = 6;
@@ -21,33 +26,36 @@ enum RpcType {
 }
 
 message UserToBitHandshake {
-    optional bool support_listening = 1;
-    optional int32 rpc_version = 2;
+    optional bool support_listening = 2;
+    optional int32 rpc_version = 3;
 }
 
 message RequestResults {
-  optional int64 query_id = 1;
+  optional exec.shared.QueryId query_id = 1;
   optional int32 maximum_responses = 2;
 }
 
 message RunQuery {
-  optional QueryResultsMode mode = 1;
-  optional string plan = 2;
+  optional QueryResultsMode results_mode = 1;
+  optional QueryType type = 2;
+  optional string plan = 3;
+}
+
+enum QueryType {
+  SQL = 1;
+  LOGICAL = 2;
+  PHYSICAL = 3;
 }
 
 enum QueryResultsMode {
 	STREAM_FULL = 1; // Server will inform the client regularly on the status of the query. Once the query is completed, service will inform the client as each query chunk is made available.
-	STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query.  Once the query is completed, server will inform the client of the first query chunk.
-	QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
+	// STREAM_FIRST = 2; // Server will inform the client regularly on the status of the query.  Once the query is completed, server will inform the client of the first query chunk.
+	// QUERY_FOR_STATUS = 3; // Client will need to query for status of query.
 }
 
 
 message BitToUserHandshake {
-	optional int32 rpc_version = 1;
-}
-
-message QueryHandle {
-  	optional int64 query_id = 1;
+	optional int32 rpc_version = 2;
 }
 
 message NodeStatus {
@@ -56,37 +64,26 @@ message NodeStatus {
 }
 
 message QueryResult {
-	enum Outcome {
-	  RUNNING = 0;
-	  FAILED = 1;
+	enum QueryState {
+	  PENDING = 0;
+	  RUNNING = 1;
 	  COMPLETED = 2;
-	  WAITING = 3;
+	  CANCELED = 3;
+	  FAILED = 4;
+	  UNKNOWN_QUERY = 5;
 	}
 	
-	optional Outcome outcome = 1;
-	optional SchemaDef schema = 2;
+	optional QueryState query_state = 1;
+	optional exec.shared.QueryId query_id = 2;
 	optional bool is_last_chunk = 3;
 	optional int32 row_count = 4;
 	optional int64 records_scan = 5;
 	optional int64 records_error = 6;
 	optional int64 submission_time = 7;
 	repeated NodeStatus node_status = 8;	
-	repeated Error error = 9;
-}
-
-message TextErrorLocation{
-    optional int32 start_column = 2;
-    optional int32 start_row = 3;
-    optional int32 end_column = 4;
-    optional int32 end_row = 5;
-}
-
-message Error{
-    optional int64 error_id = 1; // for debug tracing purposes
-    optional string host = 2;
-    optional int32 error_type = 3; 
-    optional string message = 4;
-    optional TextErrorLocation error = 5; //optional, used when providing location of error within a piece of text.
+	repeated exec.shared.DrillPBError error = 9;
+	optional exec.shared.RecordBatchDef def = 10;
+	optional bool schema_changed = 11;
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto b/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto
new file mode 100644
index 0000000..5643c0f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/protobuf/UserBitShared.proto
@@ -0,0 +1,46 @@
+package exec.shared;
+
+option java_package = "org.apache.drill.exec.proto";
+option java_outer_classname = "UserBitShared";
+option optimize_for = SPEED;
+
+import "Coordination.proto";
+import "SchemaDef.proto";
+
+message QueryId {
+  	optional sfixed64 part1 = 1;
+  	optional sfixed64 part2 = 2;
+}
+
+message DrillPBError{
+    optional string error_id = 1; // for debug tracing purposes
+    optional DrillbitEndpoint endpoint = 2;
+    optional int32 error_type = 3; 
+    optional string message = 4;
+    repeated ParsingError parsing_error = 5; //optional, used when providing location of error within a piece of text.
+}
+
+message ParsingError{
+    optional int32 start_column = 2;
+    optional int32 start_row = 3;
+    optional int32 end_column = 4;
+    optional int32 end_row = 5;
+}
+
+message RecordBatch{
+    
+}
+
+message RecordBatchDef {
+	repeated FieldMetadata field = 1;
+	optional int32 record_count = 2;
+	
+}
+
+message FieldMetadata {
+  optional FieldDef def = 1;
+  optional int32 value_count = 2;
+  optional int32 var_byte_length = 3;
+  optional int32 group_count = 4; // number of groups.  (number of repeated records)
+  optional int32 buffer_length = 5;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
index 37ba12b..d113ca3 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java
@@ -103,4 +103,11 @@ public class DrillSystemTestBase {
     }
   }
 
+  public Drillbit getABit(){
+    return this.servers.iterator().next();
+  }
+  
+  public static DrillConfig getConfig(){
+    return config;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
index 09a06d7..dc463e3 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/client/DrillClientSystemTest.java
@@ -1,17 +1,20 @@
 package org.apache.drill.exec.client;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
+import java.util.List;
+
 import org.apache.drill.exec.DrillSystemTestBase;
-import org.apache.drill.exec.proto.UserProtos;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.junit.After;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-/**
- * @author David Alves
- */
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+
+@Ignore
 public class DrillClientSystemTest extends DrillSystemTestBase {
 
   private static String plan;
@@ -34,8 +37,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
     startCluster(1);
     DrillClient client = new DrillClient();
     client.connect();
-    DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
-    System.out.println(result.get());
+    List<QueryResultBatch> result = client.runQuery(QueryType.LOGICAL, plan);
+    System.out.println(result);
     client.close();
   }
 
@@ -45,8 +48,8 @@ public class DrillClientSystemTest extends DrillSystemTestBase {
     startCluster(2);
     DrillClient client = new DrillClient();
     client.connect();
-    DrillRpcFuture<UserProtos.QueryHandle> result = client.submitPlan(plan);
-    System.out.println(result.get());
+    List<QueryResultBatch> result = client.runQuery(QueryType.LOGICAL, plan);
+    System.out.println(result);
     client.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
index 3d5d84e..2f8aa18 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/compile/TestClassCompilationTypes.java
@@ -18,14 +18,15 @@
 package org.apache.drill.exec.compile;
 
 import org.codehaus.commons.compiler.jdk.ExpressionEvaluator;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TestClassCompilationTypes {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestClassCompilationTypes.class);
 
-  @Test
+  @Ignore @Test
   public void comparePerfs() throws Exception {
-    for(int i =0; i < 50000; i++){
+    for(int i =0; i < 500; i++){
       int r = 0;
       long n0 = System.nanoTime();
       r += janino();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java
new file mode 100644
index 0000000..3b6bf6a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/config/ParsePhysicalPlan.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.junit.Test;
+
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class ParsePhysicalPlan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParsePhysicalPlan.class);
+  
+  
+  @Test 
+  public void parseSimplePlan() throws Exception{
+    DrillConfig c = DrillConfig.create();
+    ObjectReader r = c.getMapper().reader(PhysicalPlan.class);
+    ObjectWriter writer = c.getMapper().writer();
+    PhysicalPlan plan = PhysicalPlan.parse(r, Files.toString(FileUtils.getResourceAsFile("/physical_test1.json"), Charsets.UTF_8));
+    System.out.println(plan.unparse(writer));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
new file mode 100644
index 0000000..7c6bfe5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/DistributedFragmentRun.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import java.util.List;
+
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+@Ignore
+public class DistributedFragmentRun extends PopUnitTestBase{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedFragmentRun.class);
+  
+  
+  @Test 
+  public void simpleDistributedQuery() throws Exception{
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+    try(Drillbit bit1 = new Drillbit(CONFIG, serviceSet); Drillbit bit2 = new Drillbit(CONFIG, serviceSet); DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+      bit1.run();
+      bit2.run();
+      client.connect();
+      List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_single_exchange.json"), Charsets.UTF_8));
+      System.out.println(results);
+    }
+    
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java
new file mode 100644
index 0000000..6755bb6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleFragmentRun.java
@@ -0,0 +1,100 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.physical.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.junit.Test;
+
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public class SimpleFragmentRun extends PopUnitTestBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleFragmentRun.class);
+
+  @Test
+  public void runNoExchangeFragment() throws Exception {
+    try(RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); 
+        Drillbit bit = new Drillbit(CONFIG, serviceSet); 
+        DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator());){
+    
+    // run query.
+    bit.run();
+    client.connect();
+    List<QueryResultBatch> results = client.runQuery(QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/physical_test2.json"), Charsets.UTF_8));
+    
+    // look at records
+    RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+    int recordCount = 0;
+    for (QueryResultBatch batch : results) {
+      if(!batch.hasData()) continue;
+      boolean schemaChanged = batchLoader.load(batch.getHeader().getDef(), batch.getData());
+      boolean firstColumn = true;
+
+      // print headers.
+      if (schemaChanged) {
+        System.out.println("\n\n========NEW SCHEMA=========\n\n");
+        for (IntObjectCursor<ValueVector<?>> v : batchLoader) {
+
+          if (firstColumn) {
+            firstColumn = false;
+          } else {
+            System.out.print("\t");
+          }
+          System.out.print(v.value.getField().getName());
+          System.out.print("[");
+          System.out.print(v.value.getField().getType().getMinorType());
+          System.out.print("]");
+        }
+        System.out.println();
+      }
+
+
+      for (int i = 0; i < batchLoader.getRecordCount(); i++) {
+        boolean first = true;
+        recordCount++;
+        for (IntObjectCursor<ValueVector<?>> v : batchLoader) {
+          if (first) {
+            first = false;
+          } else {
+            System.out.print("\t");
+          }
+          System.out.print(v.value.getObject(i));
+        }
+        if(!first) System.out.println();
+      }
+
+    }
+    logger.debug("Received results {}", results);
+    assertEquals(recordCount, 200);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
index 98bb874..7b7ab8e 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckFragmenter.java
@@ -24,63 +24,63 @@ import static org.junit.Assert.assertNull;
 import java.io.IOException;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.physical.PhysicalPlan;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.planner.FragmentNode;
-import org.apache.drill.exec.planner.FragmentingPhysicalVisitor;
-import org.apache.drill.exec.planner.FragmentNode.ExchangeFragmentPair;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.SimpleExecPlanner;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.QueryWorkUnit;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 
-public class CheckFragmenter {
+public class CheckFragmenter extends PopUnitTestBase {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CheckFragmenter.class);
-  
-  static DrillConfig config;
-  
-  @BeforeClass
-  public static void setup(){
-    config = DrillConfig.create();
-  }
-  
+
+
   @Test
-  public void ensureOneFragment() throws FragmentSetupException, IOException{
-    FragmentNode b = getRootFragment("/physical_test1.json");
+  public void ensureOneFragment() throws FragmentSetupException, IOException {
+    PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
+    Fragment b = getRootFragment(ppr, "/physical_test1.json");
     assertEquals(1, getFragmentCount(b));
     assertEquals(0, b.getReceivingExchangePairs().size());
     assertNull(b.getSendingExchange());
   }
-  
+
   @Test
-  public void ensureTwoFragments() throws FragmentSetupException, IOException{
-    FragmentNode b = getRootFragment("/physical_simpleexchange.json");
-    assertEquals(2, getFragmentCount(b));
+  public void ensureThreeFragments() throws FragmentSetupException, IOException {
+    PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
+    Fragment b = getRootFragment(ppr, "/physical_simpleexchange.json");
+    logger.debug("Fragment Node {}", b);
+    assertEquals(3, getFragmentCount(b));
     assertEquals(1, b.getReceivingExchangePairs().size());
     assertNull(b.getSendingExchange());
-    
+
     // get first child.
     b = b.iterator().next().getNode();
+    assertEquals(1, b.getReceivingExchangePairs().size());
+    assertNotNull(b.getSendingExchange());
+
+    b = b.iterator().next().getNode();
     assertEquals(0, b.getReceivingExchangePairs().size());
     assertNotNull(b.getSendingExchange());
   }
+
   
-  private int getFragmentCount(FragmentNode b){
-    int i =1;
-    for(ExchangeFragmentPair p : b){
-      i += getFragmentCount(p.getNode());
-    }
-    return i;
-  }
+
+
   
-  private FragmentNode getRootFragment(String file) throws FragmentSetupException, IOException{
-    FragmentingPhysicalVisitor f = new FragmentingPhysicalVisitor();
-    
-    PhysicalPlan plan = PhysicalPlan.parse(config.getMapper().reader(PhysicalPlan.class), Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
-    PhysicalOperator o = plan.getSortedOperators(false).iterator().next();
-    return o.accept(f, null);
-  }
+  
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
index b8fd278..1d0fb91 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/CheckInjectionValue.java
@@ -22,12 +22,12 @@ import static org.junit.Assert.*;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.physical.PhysicalPlan;
-import org.apache.drill.common.physical.pop.Screen;
-import org.apache.drill.common.physical.pop.base.PhysicalOperator;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Screen;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StorageEngineRegistry;
 import org.junit.BeforeClass;
@@ -48,8 +48,8 @@ public class CheckInjectionValue {
   
   @Test
   public void testInjected() throws Exception{
-    PhysicalPlanReader r = new PhysicalPlanReader(config.getMapper(), DrillbitEndpoint.getDefaultInstance());
-    PhysicalPlan p = r.read(Files.toString(FileUtils.getResourceAsFile("/physical_screen.json"), Charsets.UTF_8));
+    PhysicalPlanReader r = new PhysicalPlanReader(config, config.getMapper(), DrillbitEndpoint.getDefaultInstance());
+    PhysicalPlan p = r.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/physical_screen.json"), Charsets.UTF_8));
     
     List<PhysicalOperator> o = p.getSortedOperators(false);
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
new file mode 100644
index 0000000..6f229a3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/FragmentChecker.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop;
+
+import static org.junit.Assert.*;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.StatsCollector;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class FragmentChecker extends PopUnitTestBase{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentChecker.class);
+  
+  
+  @Test
+  public void checkSimpleExchangePlan() throws Exception{
+    
+    PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance());
+    Fragment fragmentRoot = getRootFragment(ppr, "/physical_simpleexchange.json");
+    PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
+    SimpleParallelizer par = new SimpleParallelizer();
+    
+    DrillbitEndpoint b1 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(1234).build();
+    DrillbitEndpoint b2 = DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(2345).build();
+    
+    QueryWorkUnit qwu = par.getFragments(b1, QueryId.getDefaultInstance(), Lists.newArrayList(b1, b2), ppr, fragmentRoot, planningSet, 10);
+    assertEquals(qwu.getFragments().size(), 3);
+    System.out.println("=========ROOT FRAGMENT=========");
+    System.out.print(qwu.getRootFragment().getFragmentJson());
+    
+    
+    for(PlanFragment f : qwu.getFragments()){
+      System.out.println("=========");
+      System.out.print(f.getFragmentJson());
+    }
+    logger.debug("Planning Set {}", planningSet);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
new file mode 100644
index 0000000..e5e109e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/pop/PopUnitTestBase.java
@@ -0,0 +1,71 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.pop;
+
+import java.io.IOException;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
+import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+public abstract class PopUnitTestBase {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PopUnitTestBase.class);
+  
+  // Determine if we are in Eclipse Debug mode.
+  static final boolean IS_DEBUG = java.lang.management.ManagementFactory.getRuntimeMXBean().getInputArguments().toString().indexOf("-agentlib:jdwp") > 0;
+  protected static DrillConfig CONFIG;
+
+  // Set a timeout unless we're debugging.
+  @Rule public TestRule globalTimeout = IS_DEBUG ? new TestName() : new Timeout(10000);
+  
+  @BeforeClass
+  public static void setup() {
+    CONFIG = DrillConfig.create();
+  }
+
+  
+  public static int getFragmentCount(Fragment b) {
+    int i = 1;
+    for (ExchangeFragmentPair p : b) {
+      i += getFragmentCount(p.getNode());
+    }
+    return i;
+  }
+
+  public Fragment getRootFragment(PhysicalPlanReader reader, String file) throws FragmentSetupException, IOException {
+    MakeFragmentsVisitor f = new MakeFragmentsVisitor();
+
+    PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8));
+    PhysicalOperator o = plan.getSortedOperators(false).iterator().next();
+    return o.accept(f, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java
deleted file mode 100644
index d003373..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/RunRemoteQuery.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.rpc.user;
-
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.junit.Test;
-
-public class RunRemoteQuery {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RunRemoteQuery.class);
-  
-  @Test 
-  public void runRemoteQuery() throws Exception{
-    UserClient c = new UserClient(UnpooledByteBufAllocator.DEFAULT, new NioEventLoopGroup(1));
-    c.connectAsClient("localhost", 31010);
-    DrillRpcFuture<QueryHandle> futureHandle = c.submitQuery(RunQuery.getDefaultInstance());
-    QueryHandle h = futureHandle.checkedGet();
-    System.out.println(h);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
deleted file mode 100644
index c8ce877..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/UserRpcTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.rpc.user;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.channel.nio.NioEventLoopGroup;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.exec.proto.UserProtos.QueryHandle;
-import org.apache.drill.exec.proto.UserProtos.QueryResultsMode;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.junit.Test;
-
-public class UserRpcTest {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcTest.class);
-  
-  
-  
-  
-  @Test
-  public void doBasicRpcTest() throws Exception {
-    final int bufferSize = 25000;
-    final int batchSize = 1000;
-    final int batchCount = 100;
-
-    
-    int sends = 0;
-    int receives = 0;
-    long nanoSend = 0;
-    long nanoReceive = 0;
-
-    
-    try {
-      ByteBufAllocator bb = new PooledByteBufAllocator(true);
-//      ByteBufAllocator bb = UnpooledByteBufAllocator.DEFAULT;
-      UserServer s = new UserServer(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Server-")), null);
-      s.bind(31515);
-
-      logger.debug("Starting user client.");
-      UserClient c = new UserClient(bb, new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
-
-      logger.debug("Connecting as client to server.");
-      c.connectAsClient("localhost", 31515);
-
-      
-      @SuppressWarnings("unchecked")
-      DrillRpcFuture<QueryHandle>[] handles = new DrillRpcFuture[batchSize];
-
-      for (int x = 0; x < batchCount; x++) {
-        long s1 = System.nanoTime();
-        for (int i = 0; i < batchSize; i++) {
-          sends++;
-          ByteBuf rawBody = bb.buffer(bufferSize);
-          rawBody.writerIndex(bufferSize);
-          if(rawBody.readableBytes() != bufferSize) throw new RuntimeException();
-          handles[i] = c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build(), rawBody);
-        }
-        
-        long s2 = System.nanoTime();
-
-        for (int i = 0; i < batchSize; i++) {
-          handles[i].checkedGet(2, TimeUnit.SECONDS).getQueryId();
-          receives++;
-        }
-
-        long s3 = System.nanoTime();
-        nanoSend += (s2-s1);
-        nanoReceive += (s3-s2);
-        logger.debug("Submission time {}ms, return time {}ms", (s2 - s1) / 1000 / 1000, (s3 - s2) / 1000 / 1000);
-      }
-      // logger.debug("Submitting query.");
-      // DrillRpcFuture<QueryHandle> handleFuture =
-      // c.submitQuery(RunQuery.newBuilder().setMode(QueryResultsMode.QUERY_FOR_STATUS).build());
-      //
-      // logger.debug("Got query id handle of {}", handleFuture.get(2, TimeUnit.SECONDS).getQueryId());
-    } catch (Exception e) {
-      logger.error("Exception of type {} occurred while doing test.", e.getClass().getCanonicalName());
-      throw e;
-    } finally{
-      long mbsTransferred = (1l * bufferSize * batchSize * batchCount)/1024/1024;
-      double sSend = nanoSend*1.0d/1000/1000/1000;
-      double sReceive = nanoReceive*1.0d/1000/1000/1000;
-      logger.info(String.format("Completed %d sends and %d receives.  Total data transferred was %d.  Send bw: %f, Receive bw: %f.", sends, receives, mbsTransferred, mbsTransferred*1.0/sSend, mbsTransferred*1.0/sReceive));
-      logger.info("Completed {} sends and {} receives.", sends, receives);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
new file mode 100644
index 0000000..9684e9f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestBitRpc.java
@@ -0,0 +1,84 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import io.netty.buffer.ByteBuf;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.bit.BitClient;
+import org.apache.drill.exec.rpc.bit.BitComImpl;
+import org.apache.drill.exec.rpc.bit.BitConnection;
+import org.apache.drill.exec.rpc.bit.BitRpcConfig;
+import org.apache.drill.exec.rpc.bit.BitServer;
+import org.apache.drill.exec.rpc.bit.ListenerPool;
+import org.apache.drill.exec.work.batch.BitComHandler;
+import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class TestBitRpc {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestBitRpc.class);
+  
+  @Test
+  public void testBasicConnectionAndHandshake() throws Exception{
+    int port = 1234;
+    BootStrapContext c = new BootStrapContext(DrillConfig.create());
+    ConcurrentMap<DrillbitEndpoint, BitConnection> registry = Maps.newConcurrentMap();
+    BitServer server = new BitServer(new BitComTestHandler(), c, registry, new ListenerPool(2));
+    port = server.bind(port);
+    for(int i =0; i < 10; i++){
+      BitClient client = new BitClient(DrillbitEndpoint.newBuilder().setAddress("localhost").setBitPort(port).build(), null, new BitComTestHandler(), c, registry, new ListenerPool(2));
+      client.connect();
+      
+    }
+    System.out.println("connected");
+  }
+  
+  
+  
+  private class BitComTestHandler implements BitComHandler{
+
+    @Override
+    public Response handle(BitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+      return BitRpcConfig.OK;
+    }
+
+    @Override
+    public void startNewRemoteFragment(PlanFragment fragment) {
+    }
+
+    @Override
+    public Ack cancelFragment(FragmentHandle handle) {
+      return null;
+    }
+
+    @Override
+    public void registerIncomingFragmentHandler(IncomingFragmentHandler handler) {
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java
deleted file mode 100644
index 18c6955..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.logical.StorageEngineConfigBase;
-
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-@JsonTypeName("mock-config")
-public class MockRecordConfig extends StorageEngineConfigBase{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordConfig.class);
-  
-  private int recordCount;
-  private DataType[] types;
-  
-  public int getRecordCount() {
-    return recordCount;
-  }
-  public void setRecordCount(int recordCount) {
-    this.recordCount = recordCount;
-  }
-  public DataType[] getTypes() {
-    return types;
-  }
-  public void setTypes(DataType[] types) {
-    this.types = types;
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java
deleted file mode 100644
index e1f56bd..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockRecordReader.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.exec.exception.ExecutionSetupException;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OutputMutator;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.vector.Int16Vector;
-import org.apache.drill.exec.record.vector.Int32Vector;
-import org.apache.drill.exec.record.vector.ValueVector;
-
-public class MockRecordReader implements RecordReader {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
-
-  private BatchSchema expectedSchema;
-  private OutputMutator output;
-  private MockRecordConfig config;
-  private FragmentContext context;
-  private ValueVector<?>[] valueVectors;
-  private int recordsRead;
-
-  public MockRecordReader(FragmentContext context, MockRecordConfig config) {
-    this.config = config;
-  }
-
-  private int getEstimatedRecordSize(DataType[] types) {
-    int x = 0;
-    for (int i = 0; i < types.length; i++) {
-      x += getEstimatedColumnSize(i);
-    }
-    return x;
-  }
-
-  private int getEstimatedColumnSize(int fieldId) {
-    return 4;
-  }
-
-  private ValueVector<?> getVector(int fieldId, DataType dt, int length) {
-    ValueVector<?> v;
-    if (dt == DataType.INT16) {
-      v = new Int16Vector(fieldId, context.getAllocator());
-    } else if (dt == DataType.INT32) {
-      v = new Int32Vector(fieldId, context.getAllocator());
-    } else {
-      throw new UnsupportedOperationException();
-    }
-    v.allocateNew(length);
-    return v;
-
-  }
-
-  @Override
-  public void setup(BatchSchema expectedSchema, OutputMutator output) throws ExecutionSetupException {
-    try {
-      this.expectedSchema = expectedSchema;
-      this.output = output;
-      int estimateRowSize = getEstimatedRecordSize(config.getTypes());
-      valueVectors = new ValueVector<?>[config.getTypes().length];
-      int batchRecordCount = 250000 / estimateRowSize;
-
-      for (int i = 0; i < config.getTypes().length; i++) {
-        valueVectors[i] = getVector(i, config.getTypes()[i], batchRecordCount);
-        output.addField(i, valueVectors[i]);
-      }
-    } catch (SchemaChangeException e) {
-      throw new ExecutionSetupException("Failure while setting up fields", e);
-    }
-
-  }
-
-  @Override
-  public int next() {
-    int recordSetSize = Math.min(valueVectors[0].size(), this.config.getRecordCount()- recordsRead);
-    recordsRead += recordSetSize;
-    return recordSetSize;
-  }
-
-  @Override
-  public void cleanup() {
-    for (int i = 0; i < valueVectors.length; i++) {
-      try {
-        output.removeField(valueVectors[i].getField().getFieldId());
-      } catch (SchemaChangeException e) {
-        logger.warn("Failure while trying tremove field.", e);
-      }
-      valueVectors[i].close();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java
deleted file mode 100644
index cc82540..0000000
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockStorageEngine.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.store;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import org.apache.drill.common.logical.data.Scan;
-import org.apache.drill.common.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.ops.FragmentContext;
-
-import com.google.common.collect.ListMultimap;
-
-public class MockStorageEngine extends AbstractStorageEngine{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
-
-  @Override
-  public boolean supportsRead() {
-    return true;
-  }
-
-  @Override
-  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException {
-    return null;
-  }
-
-  @Override
-  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
-    return null;
-  }
-
-  @Override
-  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
-    return null;
-  }
-
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
index 8785736..771a2fd 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
@@ -7,6 +7,9 @@ drill.exec: {
   	user.port : 31010,
   	bit.port : 32010
   },
+  operator: {
+    packages += "org.apache.drill.exec.physical.config"
+  },
   optimizer: {
     implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
   },
@@ -30,4 +33,4 @@ drill.exec: {
   network: {
     start: 35000
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
index b79b811..3af6e10 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/logback.xml
@@ -1,16 +1,14 @@
 <?xml version="1.0" encoding="UTF-8" ?>
 <configuration>
-
+<!-- 
   <appender name="SOCKET" class="de.huxhorn.lilith.logback.appender.ClassicMultiplexSocketAppender">
     <Compressing>true</Compressing> 
     <ReconnectionDelay>10000</ReconnectionDelay>
     <IncludeCallerData>true</IncludeCallerData>
     <RemoteHosts>localhost</RemoteHosts>
   </appender>
-
+-->
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-    <!-- encoders are assigned the type
-         ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
     <encoder>
       <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
     </encoder>
@@ -30,14 +28,14 @@
   --> 
   <logger name="org.apache.drill" additivity="false">
     <level value="debug" />
-    <appender-ref ref="SOCKET" />
+<!--     <appender-ref ref="SOCKET" /> -->
     <appender-ref ref="STDOUT" />
 <!--     <appender-ref ref="FILE" /> -->
   </logger>
 
   <root>
     <level value="error" />
-    <appender-ref ref="SOCKET" />
+<!--     <appender-ref ref="SOCKET" /> -->
     <appender-ref ref="STDOUT" />
 <!--     <appender-ref ref="FILE" /> -->
   </root>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
index c63aef1..8bb97db 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_screen.json
@@ -12,9 +12,8 @@
             pop:"mock-scan",
             url: "http://apache.org",
             entries:[
-            	{id:1}
-            ],
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+            	{id:1, records: 100, size: 100}
+            ]
         },
         {
             @id:2,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
index e332785..85823cf 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_simpleexchange.json
@@ -12,30 +12,44 @@
             pop:"mock-scan",
             url: "http://apache.org",
             entries:[
-            	{id:1}
-            ],
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]},
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            ]
         },
         {
             @id:2,
-            pop: "partition-to-random-exchange",
-            child: 1,
-            partition: {
-              mode: "DUPLICATE"
-            }
+            pop: "hash-to-random-exchange",
+            child: 1
         },
         {
             @id:3,
             child: 2,
             pop:"filter",
             expr: "b > 5",
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+            selectivity: 0.8
         },
         {
             @id: 4,
             child: 3,
-            pop: "mock-store",
-            cost: { disk: 1.0, memory: 1.0, cpu: 1.0, network: 1.0}
+            pop: "mock-store"
+        },
+        {
+            @id:5,
+            child: 4,
+            pop: "union-exchange"
+        },
+        {
+            @id: 6,
+            child: 5,
+            pop: "screen"
         }
     ]
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
new file mode 100644
index 0000000..675ecfb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_single_exchange.json
@@ -0,0 +1,34 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            	
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop: "union-exchange"
+        },
+        {
+            @id: 3,
+            child: 2,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json
new file mode 100644
index 0000000..0ddd48f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test1.json
@@ -0,0 +1,40 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+	graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            ]
+        },
+        {
+            @id:2,
+            child: 1,
+            pop:"filter",
+            expr: "b > 5",
+            selectivity: 0.80
+        },
+        {
+            @id: 3,
+            child: 2,
+            pop: "mock-store"
+        },
+        {
+            @id: 4,
+            child: 3,
+            pop: "screen"
+        }
+    ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json
new file mode 100644
index 0000000..b001921
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/physical_test2.json
@@ -0,0 +1,34 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+    graph:[
+        {
+            @id:1,
+            pop:"mock-scan",
+            url: "http://apache.org",
+            entries:[
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]},
+            	{records: 100, types: [
+            	  {name: "blue", type: "INT", mode: "REQUIRED"},
+            	  {name: "red", type: "BIGINT", mode: "REQUIRED"},
+            	  {name: "green", type: "INT", mode: "REQUIRED"}
+            	]}
+            ]
+        },
+        {
+            @id: 2,
+            child: 1,
+            pop: "screen"
+        }
+    ]    
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/ref/src/test/resources/donuts.json
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/ref/src/test/resources/donuts.json b/sandbox/prototype/exec/ref/src/test/resources/donuts.json
index 2d98b59..9fd6e3e 100644
--- a/sandbox/prototype/exec/ref/src/test/resources/donuts.json
+++ b/sandbox/prototype/exec/ref/src/test/resources/donuts.json
@@ -20,7 +20,7 @@
 			[
 				{ "id": "5001", "type": "None" },
 				{ "id": "5002", "type": "Glazed" },
-				{ "id": "5005", "type": "Sugar" },
+				{ "id": "5005", "type": "Sugar", color: "White"},
 				{ "id": "5007", "type": "Powdered Sugar" },
 				{ "id": "5006", "type": "Chocolate with Sprinkles" },
 				{ "id": "5003", "type": "Chocolate" },