You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:09 UTC

[5/9] basic framework for physical plan. abstraction of graph classes.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
new file mode 100644
index 0000000..dd00e04
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitComImpl.java
@@ -0,0 +1,142 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.channel.Channel;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.Future;
+
+import java.util.Map;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.Closeables;
+
+public class BitComImpl implements BitCom {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitComImpl.class);
+
+  private Map<DrillbitEndpoint, BitTunnel> tunnels = Maps.newConcurrentMap();
+  private Map<SocketChannel, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
+  private Object lock = new Object();
+  private BitServer server;
+  private DrillbitContext context;
+
+  public BitComImpl(DrillbitContext context) {
+    this.context = context;
+  }
+
+  public int start() throws InterruptedException, DrillbitStartupException {
+    server = new BitServer(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup(), context);
+    int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
+    return server.bind(port);
+  }
+
+  private Future<BitTunnel> getNode(DrillbitEndpoint endpoint) {
+    return null;
+    
+//    BitTunnel t = tunnels.get(endpoint);
+//    if (t == null) {
+//      synchronized (lock) {
+//        t = tunnels.get(endpoint);
+//        if (t != null) return t;
+//        BitClient c = new BitClient(new BitComHandler(modifier), context.getAllocator().getUnderlyingAllocator(),
+//            context.getBitLoopGroup(), context);
+//
+//        // need to figure what to do here with regards to waiting for handshake before returning. Probably need to add
+//        // future registry so that new endpoint registration ping the registry.
+//        throw new UnsupportedOperationException();
+//        c.connectAsClient(endpoint.getAddress(), endpoint.getBitPort()).await();
+//        t = new BitTunnel(c);
+//        tunnels.put(endpoint, t);
+//
+//      }
+//    }
+//    return null;
+  }
+
+  @Override
+  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, DrillbitEndpoint node, RecordBatch batch) {
+    return null;
+  }
+
+  @Override
+  public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, DrillbitEndpoint node,
+      PlanFragment fragment) {
+    return null;
+  }
+
+  @Override
+  public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, DrillbitEndpoint node, FragmentHandle handle) {
+    return null;
+  }
+
+  @Override
+  public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, DrillbitEndpoint node,
+      FragmentHandle handle) {
+    return null;
+  }
+
+  private final TunnelModifier modifier = new TunnelModifier();
+
+  /**
+   * Fully synchronized modifier. Contention should be low since endpoints shouldn't be constantly changing.
+   */
+  class TunnelModifier {
+    public BitTunnel remove(Channel ch) {
+      synchronized (this) {
+        DrillbitEndpoint endpoint = endpoints.remove(ch);
+        if (endpoint == null) {
+          logger
+              .warn("We attempted to find a endpoint from a provided channel and found none.  This suggests a race condition or memory leak problem.");
+          return null;
+        }
+
+        BitTunnel tunnel = tunnels.remove(endpoint);
+        return tunnel;
+      }
+    }
+
+    public void create(SocketChannel channel, DrillbitEndpoint endpoint, RpcBus<?> bus) {
+      synchronized (this) {
+        endpoints.put(channel, endpoint);
+        tunnels.put(endpoint, new BitTunnel(bus));
+      }
+    }
+
+  }
+
+  public void close() {
+    Closeables.closeQuietly(server);
+    for (BitTunnel bt : tunnels.values()) {
+      bt.shutdownIfClient();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
new file mode 100644
index 0000000..e17b25c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitServer.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import org.apache.drill.exec.proto.ExecProtos.RpcType;
+import org.apache.drill.exec.rpc.BasicServer;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.protobuf.MessageLite;
+
+public class BitServer extends BasicServer<RpcType>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServer.class);
+  
+  private final DrillbitContext context;
+  private final BitComHandler handler;
+  
+  public BitServer(BitComHandler handler, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
+    super(alloc, eventLoopGroup);
+    this.context = context;
+    this.handler = handler;
+  }
+  
+  @Override
+  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    return handler.getResponseDefaultInstance(rpcType);
+  }
+
+  @Override
+  protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    return handler.handle(context, rpcType, pBody, dBody);
+  }
+
+  @Override
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch) {
+    
+    return super.getCloseHandler(ch);
+  }
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
new file mode 100644
index 0000000..02991ad
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitTunnel.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.bit;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.RpcBus;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Interface provided for communication between two bits.  Provided by both a server and a client implementation.
+ */
+public class BitTunnel {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitTunnel.class);
+
+  final RpcBus<?> bus;
+
+  public BitTunnel(RpcBus<?> bus){
+    this.bus = bus;
+  }
+  
+  public DrillRpcFuture<Ack> sendRecordBatch(FragmentContext context, RecordBatch batch){
+    return null;
+  }
+  
+  public DrillRpcFuture<FragmentHandle> sendFragment(FragmentContext context, PlanFragment fragment){
+    return null;
+  }
+  
+  public DrillRpcFuture<Ack> cancelFragment(FragmentContext context, FragmentHandle handle){
+    return null;
+  }
+  
+  public DrillRpcFuture<FragmentStatus> getFragmentStatus(FragmentContext context, FragmentHandle handle){
+    return null;
+  }
+  
+  public void shutdownIfClient(){
+    if(bus.isClient()) Closeables.closeQuietly(bus);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
new file mode 100644
index 0000000..cd6e15d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.google.protobuf.MessageLite;
+
+public class UserClient extends BasicClient<RpcType> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
+  
+  public UserClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+    super(alloc, eventLoopGroup);
+  }
+
+
+  public DrillRpcFuture<QueryHandle> submitQuery(RunQuery query, ByteBuf data) throws RpcException {
+    return this.send(RpcType.RUN_QUERY, query, QueryHandle.class, data);
+  }
+
+  @Override
+  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    switch(rpcType){
+    case RpcType.ACK_VALUE:
+      return Ack.getDefaultInstance();
+    case RpcType.HANDSHAKE_VALUE:
+      return BitToUserHandshake.getDefaultInstance();
+    case RpcType.QUERY_HANDLE_VALUE:
+      return QueryHandle.getDefaultInstance();
+    case RpcType.QUERY_RESULT_VALUE:
+      return QueryResult.getDefaultInstance();
+    }
+    throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
+  }
+
+
+  @Override
+  protected Response handle(SocketChannel ch, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    logger.debug("Received a server > client message of type " + rpcType);
+    return new Response(RpcType.ACK, Ack.getDefaultInstance(), null);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
new file mode 100644
index 0000000..fe70c85
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -0,0 +1,90 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc.user;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
+import org.apache.drill.exec.proto.UserProtos.QueryHandle;
+import org.apache.drill.exec.proto.UserProtos.QueryResult;
+import org.apache.drill.exec.proto.UserProtos.RpcType;
+import org.apache.drill.exec.proto.UserProtos.RunQuery;
+import org.apache.drill.exec.rpc.BasicServer;
+import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.protobuf.MessageLite;
+
+public class UserServer extends BasicServer<RpcType> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
+  
+  final DrillbitContext context;
+  
+  public UserServer(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, DrillbitContext context) {
+    super(alloc, eventLoopGroup);
+    this.context = context;
+  }
+
+  @Override
+  protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+    // a user server only expects acknowledgements on messages it creates.
+    switch (rpcType) {
+    case RpcType.ACK_VALUE:
+      return Ack.getDefaultInstance();
+    default:
+      throw new UnsupportedOperationException();
+    }
+
+  }
+
+  public DrillRpcFuture<QueryResult> sendResult(RunQuery query, ByteBuf data) throws RpcException {
+    return this.send(RpcType.QUERY_RESULT, query, QueryResult.class, data);
+  }
+  
+  
+  @Override
+  protected Response handle(SocketChannel channel, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    switch (rpcType) {
+    
+    case RpcType.HANDSHAKE_VALUE:
+//      logger.debug("Received handshake, responding in kind.");
+      return new Response(RpcType.HANDSHAKE, BitToUserHandshake.getDefaultInstance(), null);
+      
+    case RpcType.RUN_QUERY_VALUE:
+//      logger.debug("Received query to run.  Returning query handle.");
+      return new Response(RpcType.QUERY_HANDLE, QueryHandle.newBuilder().setQueryId(1).build(), null);
+      
+    case RpcType.REQUEST_RESULTS_VALUE:
+//      logger.debug("Received results requests.  Returning empty query result.");
+      return new Response(RpcType.QUERY_RESULT, QueryResult.getDefaultInstance(), null);
+      
+    default:
+      throw new UnsupportedOperationException();
+    }
+
+  }
+  
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java
new file mode 100644
index 0000000..e71d381
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/BackedRecord.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+public class BackedRecord implements Record {
+    DiffSchema schema;
+    DataRecord record;
+
+    public BackedRecord(DiffSchema schema, DataRecord record) {
+        this.schema = schema;
+        this.record = record;
+    }
+
+    public void setBackend(DiffSchema schema, DataRecord record) {
+        this.record = record;
+        this.schema = schema;
+    }
+
+    @Override
+    public DiffSchema getSchemaChanges() {
+        return schema;
+    }
+
+    @Override
+    public Object getField(int fieldId) {
+        return record.getData(fieldId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java
new file mode 100644
index 0000000..41738bb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DataRecord.java
@@ -0,0 +1,56 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+public class DataRecord {
+    private final Map<Integer, Object> dataMap;
+
+    public DataRecord() {
+        this.dataMap = Maps.newHashMap();
+    }
+
+    public void addData(int fieldId, Object data, boolean isList) {
+        //TODO: Rethink lists vs object data handling
+        if(!dataMap.containsKey(fieldId)) {
+            if(isList) {
+                dataMap.put(fieldId, Lists.newArrayList(data));
+            } else {
+                dataMap.put(fieldId, data);
+            }
+        } else {
+            if(isList) {
+                ((List)dataMap.get(fieldId)).add(data);
+            } else {
+                throw new IllegalStateException("Overriding field id existing data!");
+            }
+        }
+    }
+
+    public Object getData(int fieldId) {
+        Preconditions.checkArgument(dataMap.containsKey(fieldId));
+        return dataMap.get(fieldId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
new file mode 100644
index 0000000..016e097
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/DiffSchema.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class DiffSchema {
+    List<Field> addedFields;
+    List<Field> removedFields;
+
+    public DiffSchema() {
+        this.addedFields = Lists.newArrayList();
+        this.removedFields = Lists.newArrayList();
+    }
+
+    public void recordNewField(Field field) {
+        addedFields.add(field);
+    }
+
+    public boolean hasDiffFields() {
+        return !addedFields.isEmpty() || !removedFields.isEmpty();
+    }
+
+    public List<Field> getAddedFields() {
+        return addedFields;
+    }
+
+    public List<Field> getRemovedFields() {
+        return removedFields;
+    }
+
+    public void reset() {
+        addedFields.clear();
+        removedFields.clear();
+    }
+
+    public void addRemovedField(Field field) {
+        removedFields.add(field);
+    }
+
+    @Override
+    public String toString() {
+        return "DiffSchema{" +
+                "addedFields=" + addedFields +
+                ", removedFields=" + removedFields +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
new file mode 100644
index 0000000..e19c099
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Field.java
@@ -0,0 +1,135 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Strings;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public abstract class Field {
+    final FieldType fieldType;
+    int fieldId;
+    String prefixFieldName;
+    String fieldName;
+    RecordSchema schema;
+    RecordSchema parentSchema;
+    boolean read;
+
+
+    public Field(RecordSchema parentSchema, IdGenerator<Integer> generator, FieldType fieldType, String prefixFieldName) {
+        this.fieldId = generator.getNextId();
+        this.fieldType = fieldType;
+        this.prefixFieldName = prefixFieldName;
+        this.parentSchema = parentSchema;
+    }
+
+    public Field assignSchema(RecordSchema newSchema) {
+        checkState(schema == null, "Schema already assigned to field: %s", fieldName);
+        checkState(fieldType.isEmbedSchema(), "Schema cannot be assigned to non-embedded types: %s", fieldType);
+        schema = newSchema;
+        return this;
+    }
+
+    public String getFullFieldName() {
+        return Strings.isNullOrEmpty(prefixFieldName) ? fieldName : prefixFieldName + "." + fieldName;
+    }
+
+    public int getFieldId() {
+        return fieldId;
+    }
+
+    public String getFieldName() {
+        return fieldName;
+    }
+
+    public void setRead(boolean read) {
+        this.read = read;
+    }
+
+    protected abstract Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper);
+
+    Objects.ToStringHelper getAttributesStringHelper() {
+        return Objects.toStringHelper(this).add("type", fieldType)
+                .add("id", fieldId)
+                .add("fullFieldName", getFullFieldName())
+                .add("schema", schema == null ? null : schema.toSchemaString()).omitNullValues();
+    }
+
+    @Override
+    public String toString() {
+        return addAttributesToHelper(getAttributesStringHelper()).toString();
+    }
+
+    public RecordSchema getParentSchema() {
+        return parentSchema;
+    }
+
+    public RecordSchema getAssignedSchema() {
+        return schema;
+    }
+
+    public FieldType getFieldType() {
+        return fieldType;
+    }
+
+    public void assignSchemaIfNull(RecordSchema newSchema) {
+        if (!hasSchema()) {
+            schema = newSchema;
+        }
+    }
+
+    public boolean isRead() {
+        return read;
+    }
+
+    public boolean hasSchema() {
+        return schema != null;
+    }
+
+    public static enum FieldType {
+        INTEGER(1),
+        FLOAT(2),
+        BOOLEAN(3),
+        STRING(4),
+        ARRAY(5, true),
+        MAP(6, true);
+
+        byte value;
+        boolean embedSchema;
+
+        FieldType(int value, boolean embedSchema) {
+            this.value = (byte) value;
+            this.embedSchema = embedSchema;
+        }
+
+        FieldType(int value) {
+            this(value, false);
+        }
+
+        public byte value() {
+            return value;
+        }
+
+        public boolean isEmbedSchema() {
+            return embedSchema;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java
new file mode 100644
index 0000000..728e8e1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/IdGenerator.java
@@ -0,0 +1,13 @@
+package org.apache.drill.exec.schema;
+
+/**
+ * Created with IntelliJ IDEA.
+ * User: tnachen
+ * Date: 1/2/13
+ * Time: 10:50 PM
+ * To change this template use File | Settings | File Templates.
+ */
+public interface IdGenerator<T> {
+    public T getNextId();
+    public void reset();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
new file mode 100644
index 0000000..efdc8fd
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ListSchema.java
@@ -0,0 +1,108 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ListSchema implements RecordSchema {
+    private List<Field> fields;
+
+    public ListSchema() {
+        this.fields = Lists.newArrayList();
+    }
+
+    @Override
+    public void addField(Field field) {
+        if (field.fieldType.isEmbedSchema() || fields.isEmpty() || !isSingleTyped() ||
+                !Iterables.getOnlyElement(fields).equals(field.getFieldType())) {
+            fields.add(field);
+        }
+    }
+
+    @Override
+    public Field getField(String fieldName, int index) {
+        Field field;
+        if (isSingleTyped()) {
+            field = Iterables.getOnlyElement(fields, null);
+        } else {
+            field = index < fields.size() ? fields.get(index) : null;
+        }
+
+        return field;
+    }
+
+    @Override
+    public void removeField(Field field, int index) {
+        checkArgument(fields.size() > index);
+        checkArgument(checkNotNull(fields.get(index)).getFieldId() == field.getFieldId());
+        fields.remove(index);
+    }
+
+    @Override
+    public Iterable<? extends Field> getFields() {
+        return fields;
+    }
+
+    public boolean isSingleTyped() {
+        return fields.size() <= 1;
+    }
+
+    @Override
+    public String toSchemaString() {
+        StringBuilder builder = new StringBuilder("List_fields:[");
+        for (Field field : fields) {
+            builder.append(field.toString());
+        }
+        builder.append("]");
+        return builder.toString();
+    }
+
+    @Override
+    public void resetMarkedFields() {
+        for (Field field : fields) {
+            field.setRead(false);
+        }
+    }
+
+    @Override
+    public Iterable<? extends Field> removeUnreadFields() {
+        final List<Field> removedFields = Lists.newArrayList();
+        Iterables.removeIf(fields, new Predicate<Field>() {
+            @Override
+            public boolean apply(Field field) {
+                if (!field.isRead()) {
+                    removedFields.add(field);
+                    return true;
+                } else if(field.hasSchema()) {
+                    Iterables.addAll(removedFields, field.getAssignedSchema().removeUnreadFields());
+                }
+
+                return false;
+            }
+        });
+        return removedFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
new file mode 100644
index 0000000..aa0d6aa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/NamedField.java
@@ -0,0 +1,44 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Objects;
+
+public class NamedField extends Field {
+    final FieldType keyType;
+
+    public NamedField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType) {
+        this(parentSchema, generator, prefixFieldName, fieldName, fieldType, FieldType.STRING);
+    }
+
+    public NamedField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, FieldType keyType) {
+        super(parentSchema, generator, fieldType, prefixFieldName);
+        this.fieldName = fieldName;
+        this.keyType = FieldType.STRING;
+    }
+
+    public String getFieldName() {
+        return fieldName;
+    }
+
+    @Override
+    protected Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper) {
+        return helper.add("keyType", keyType);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java
new file mode 100644
index 0000000..9cc30f6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/ObjectSchema.java
@@ -0,0 +1,91 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+public class ObjectSchema implements RecordSchema {
+    private final Map<String, Field> fields;
+
+    public ObjectSchema() {
+        fields = Maps.newHashMap();
+    }
+
+    @Override
+    public void addField(Field field) {
+        fields.put(field.getFieldName(), field);
+    }
+
+    @Override
+    public Field getField(String fieldName, int index) {
+        return fields.get(fieldName);
+    }
+
+    @Override
+    public void removeField(Field field, int index) {
+        fields.remove(field.getFieldName());
+    }
+
+    @Override
+    public Iterable<? extends Field> getFields() {
+        return fields.values();
+    }
+
+    @Override
+    public String toSchemaString() {
+        StringBuilder builder = new StringBuilder("Object_fields:[");
+        for (Field field : fields.values()) {
+            builder.append(field.toString()).append(" ");
+        }
+        builder.append("]");
+        return builder.toString();
+    }
+
+    @Override
+    public void resetMarkedFields() {
+        for (Field field : fields.values()) {
+            field.setRead(false);
+        }
+    }
+
+    @Override
+    public Iterable<? extends Field> removeUnreadFields() {
+        final List<Field> removedFields = Lists.newArrayList();
+        Iterables.removeIf(fields.values(), new Predicate<Field>() {
+            @Override
+            public boolean apply(Field field) {
+                if (!field.isRead()) {
+                    removedFields.add(field);
+                    return true;
+                } else if (field.hasSchema()) {
+                    Iterables.addAll(removedFields, field.getAssignedSchema().removeUnreadFields());
+                }
+
+                return false;
+            }
+        });
+        return removedFields;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
new file mode 100644
index 0000000..67fd2fa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/OrderedField.java
@@ -0,0 +1,33 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.google.common.base.Objects;
+
+public class OrderedField extends Field {
+    public OrderedField(RecordSchema parentSchema, IdGenerator<Integer> generator, FieldType fieldType, String prefixFieldName, int index) {
+        super(parentSchema, generator, fieldType, prefixFieldName);
+        this.fieldName = "[" + index + "]";
+    }
+
+    @Override
+    protected Objects.ToStringHelper addAttributesToHelper(Objects.ToStringHelper helper) {
+        return helper;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java
new file mode 100644
index 0000000..16e83dc
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/Record.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+
+import java.io.IOException;
+
+public interface Record {
+    public DiffSchema getSchemaChanges();
+    public Object getField(int fieldId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java
new file mode 100644
index 0000000..db1f0ed
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/RecordSchema.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+public interface RecordSchema {
+    public void addField(Field field);
+    public Field getField(String fieldName, int index);
+    public void removeField(Field field, int index);
+    public Iterable<? extends Field> getFields();
+    public String toSchemaString();
+    void resetMarkedFields();
+    Iterable<? extends Field> removeUnreadFields();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java
new file mode 100644
index 0000000..27ed2d8
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaIdGenerator.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+public class SchemaIdGenerator implements IdGenerator<Integer> {
+    private int nextId;
+
+    public SchemaIdGenerator() {
+        nextId = 1;
+    }
+
+    public Integer getNextId() {
+        return nextId++;
+    }
+
+    @Override
+    public void reset() {
+        nextId = 1;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java
new file mode 100644
index 0000000..54a4e0e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/SchemaRecorder.java
@@ -0,0 +1,122 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.drill.exec.schema.json.jackson.JacksonHelper;
+import org.apache.drill.exec.schema.json.jackson.ScanJson;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SchemaRecorder {
+    DiffSchema diffSchema;
+    RecordSchema currentSchema;
+    List<Field> removedFields;
+
+    public SchemaRecorder() {
+        currentSchema = new ObjectSchema();
+        diffSchema = new DiffSchema();
+        removedFields = Lists.newArrayList();
+    }
+
+    public RecordSchema getCurrentSchema() {
+        return currentSchema;
+    }
+
+    public void recordData(ScanJson.ReadType currentReadType, ScanJson.ReadType readType, JsonParser parser, IdGenerator generator, DataRecord record, Field.FieldType fieldType, String prefixFieldName, String fieldName, int index) throws IOException {
+        Field field = currentSchema.getField(fieldName, index);
+
+        if (field == null || field.getFieldType() != fieldType) {
+            if (field != null) {
+                removeStaleField(index, field);
+            }
+            field = currentReadType.createField(currentSchema, generator, prefixFieldName, fieldName, fieldType, index);
+            field.setRead(true);
+            diffSchema.recordNewField(field);
+            currentSchema.addField(field);
+        } else {
+            field.setRead(true);
+        }
+
+        if (readType != null) {
+            RecordSchema origSchema = currentSchema;
+            if (field != null) {
+                currentSchema = field.getAssignedSchema();
+            }
+
+            RecordSchema newSchema = readType.createSchema();
+            field.assignSchemaIfNull(newSchema);
+            setCurrentSchemaIfNull(newSchema);
+            readType.readRecord(parser, generator, this, record, field.getFullFieldName());
+
+            currentSchema = origSchema;
+        } else {
+            RecordSchema schema = field.getParentSchema();
+            record.addData(field.getFieldId(), JacksonHelper.getValueFromFieldType(parser, fieldType), schema != null && schema instanceof ListSchema);
+        }
+    }
+
+    private void removeStaleField(int index, Field field) {
+        if (field.hasSchema()) {
+            removeChildFields(field);
+        }
+        removedFields.add(field);
+        currentSchema.removeField(field, index);
+    }
+
+    private void removeChildFields(Field field) {
+        RecordSchema schema = field.getAssignedSchema();
+        if(schema == null) { return; }
+        for (Field childField : schema.getFields()) {
+            removedFields.add(childField);
+            if (childField.hasSchema()) {
+                removeChildFields(childField);
+            }
+        }
+    }
+
+    public boolean hasDiffs() {
+        return diffSchema.hasDiffFields();
+    }
+
+    public DiffSchema getDiffSchema() {
+        return hasDiffs() ? diffSchema : null;
+    }
+
+    public void setCurrentSchemaIfNull(RecordSchema newSchema) {
+        if (currentSchema == null) {
+            currentSchema = newSchema;
+        }
+    }
+
+    public void reset() {
+        currentSchema.resetMarkedFields();
+        diffSchema.reset();
+        removedFields.clear();
+    }
+
+    public void addMissingFields() {
+        for (Field field : Iterables.concat(currentSchema.removeUnreadFields(), removedFields)) {
+            diffSchema.addRemovedField(field);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
new file mode 100644
index 0000000..0643710
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/JacksonHelper.java
@@ -0,0 +1,63 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.collect.Maps;
+
+import org.apache.drill.exec.schema.Field;
+
+import java.io.IOException;
+import java.util.EnumMap;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class JacksonHelper {
+    private static final EnumMap<JsonToken, Field.FieldType> TYPE_LOOKUP = Maps.newEnumMap(JsonToken.class);
+
+    static {
+        TYPE_LOOKUP.put(JsonToken.VALUE_STRING, Field.FieldType.STRING);
+        TYPE_LOOKUP.put(JsonToken.VALUE_FALSE, Field.FieldType.BOOLEAN);
+        TYPE_LOOKUP.put(JsonToken.VALUE_TRUE, Field.FieldType.BOOLEAN);
+        TYPE_LOOKUP.put(JsonToken.START_ARRAY, Field.FieldType.ARRAY);
+        TYPE_LOOKUP.put(JsonToken.START_OBJECT, Field.FieldType.MAP);
+        TYPE_LOOKUP.put(JsonToken.VALUE_NUMBER_INT, Field.FieldType.INTEGER);
+        TYPE_LOOKUP.put(JsonToken.VALUE_NUMBER_FLOAT, Field.FieldType.FLOAT);
+    }
+
+    public static Field.FieldType getFieldType(JsonToken token) {
+        return TYPE_LOOKUP.get(token);
+    }
+
+    public static Object getValueFromFieldType(JsonParser parser, Field.FieldType fieldType) throws IOException {
+        switch(fieldType) {
+            case INTEGER:
+                return parser.getIntValue();
+            case STRING:
+                return parser.getValueAsString();
+            case FLOAT:
+                return parser.getFloatValue();
+            case BOOLEAN:
+                return parser.getBooleanValue();
+            default:
+                throw new RuntimeException("Unexpected Field type to return value: " + fieldType.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java
new file mode 100644
index 0000000..e450ee9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperator.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import com.google.common.collect.Lists;
+
+import org.apache.drill.exec.schema.Record;
+
+import java.io.IOException;
+import java.util.List;
+
+public abstract class PhysicalOperator {
+    List<PhysicalOperatorIterator> parents;
+
+    public PhysicalOperator(PhysicalOperatorIterator... parents) {
+        this.parents = Lists.newArrayList(parents);
+    }
+
+    public abstract PhysicalOperatorIterator getIterator();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java
new file mode 100644
index 0000000..bf4053e
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/PhysicalOperatorIterator.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import org.apache.drill.exec.schema.Record;
+
+import java.io.IOException;
+
+public interface PhysicalOperatorIterator{
+    public enum NextOutcome {NONE_LEFT, INCREMENTED_SCHEMA_UNCHANGED, INCREMENTED_SCHEMA_CHANGED}
+    public Record getRecord();
+    public NextOutcome next() throws IOException;
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java
new file mode 100644
index 0000000..a1c30e9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/json/jackson/ScanJson.java
@@ -0,0 +1,203 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.json.jackson;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonToken;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import com.google.common.io.InputSupplier;
+import com.google.common.io.Resources;
+
+import org.apache.drill.exec.schema.*;
+
+import java.io.*;
+import java.util.Map;
+
+public class ScanJson extends PhysicalOperator {
+    private ScanJsonIterator iterator;
+
+    private static final Map<JsonToken, ReadType> READ_TYPES = Maps.newHashMap();
+
+    static {
+        READ_TYPES.put(JsonToken.START_ARRAY, ReadType.ARRAY);
+        READ_TYPES.put(JsonToken.START_OBJECT, ReadType.OBJECT);
+    }
+
+    public ScanJson(String inputName) throws IOException {
+        super();
+        this.iterator = new ScanJsonIterator(inputName);
+    }
+
+    @Override
+    public PhysicalOperatorIterator getIterator() {
+        return iterator;
+    }
+
+    class ScanJsonIterator implements PhysicalOperatorIterator {
+        private JsonParser parser;
+        private SchemaRecorder recorder;
+        private BackedRecord record;
+        private IdGenerator generator;
+
+        private ScanJsonIterator(String inputName) throws IOException {
+            InputSupplier<InputStreamReader> input;
+            if (inputName.startsWith("resource:")) {
+                input = Resources.newReaderSupplier(Resources.getResource(inputName.substring(inputName.indexOf(':') + 1)), Charsets.UTF_8);
+            } else {
+                input = Files.newReaderSupplier(new File(inputName), Charsets.UTF_8);
+            }
+
+            JsonFactory factory = new JsonFactory();
+            parser = factory.createJsonParser(input.getInput());
+            parser.nextToken(); // Read to the first START_OBJECT token
+            recorder = new SchemaRecorder();
+            generator = new SchemaIdGenerator();
+        }
+
+        @Override
+        public Record getRecord() {
+            return record;
+        }
+
+        @Override
+        public NextOutcome next() throws IOException {
+            if (parser.isClosed() || !parser.hasCurrentToken()) {
+                return NextOutcome.NONE_LEFT;
+            }
+
+            recorder.reset();
+
+            DataRecord dataRecord = new DataRecord();
+            ReadType.OBJECT.readRecord(parser, generator, recorder, dataRecord, null);
+
+            parser.nextToken(); // Read to START_OBJECT token
+
+            if (!parser.hasCurrentToken()) {
+                parser.close();
+            }
+
+            recorder.addMissingFields();
+            if (record == null) {
+                record = new BackedRecord(recorder.getDiffSchema(), dataRecord);
+            } else {
+                record.setBackend(recorder.getDiffSchema(), dataRecord);
+            }
+            return recorder.hasDiffs() ? NextOutcome.INCREMENTED_SCHEMA_CHANGED : NextOutcome.INCREMENTED_SCHEMA_UNCHANGED;
+        }
+
+        public RecordSchema getCurrentSchema() {
+            return recorder.getCurrentSchema();
+        }
+    }
+
+    public static enum ReadType {
+        ARRAY(JsonToken.END_ARRAY) {
+            @Override
+            public Field createField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, int index) {
+                return new OrderedField(parentSchema, generator, fieldType, prefixFieldName, index);
+            }
+
+            @Override
+            public RecordSchema createSchema() throws IOException {
+                return new ListSchema();
+            }
+        },
+        OBJECT(JsonToken.END_OBJECT) {
+            @Override
+            public Field createField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, int index) {
+                return new NamedField(parentSchema, generator, prefixFieldName, fieldName, fieldType);
+            }
+
+            @Override
+            public RecordSchema createSchema() throws IOException {
+                return new ObjectSchema();
+            }
+        };
+
+        private final JsonToken endObject;
+
+        ReadType(JsonToken endObject) {
+            this.endObject = endObject;
+        }
+
+        public JsonToken getEndObject() {
+            return endObject;
+        }
+
+        public void readRecord(JsonParser parser, IdGenerator generator, SchemaRecorder recorder, DataRecord record, String prefixFieldName) throws IOException {
+            JsonToken token = parser.nextToken();
+            JsonToken endObject = getEndObject();
+            int index = 0;
+            while (token != endObject) {
+                if (token == JsonToken.FIELD_NAME) {
+                    token = parser.nextToken();
+                    continue;
+                }
+
+                String fieldName = parser.getCurrentName();
+                Field.FieldType fieldType = JacksonHelper.getFieldType(token);
+                ReadType readType = READ_TYPES.get(token);
+                if (fieldType != null) { // Including nulls
+                    recorder.recordData(this, readType, parser, generator, record, fieldType, prefixFieldName, fieldName, index);
+                }
+                token = parser.nextToken();
+                ++index;
+            }
+        }
+
+        public abstract RecordSchema createSchema() throws IOException;
+
+        public abstract Field createField(RecordSchema parentSchema, IdGenerator<Integer> generator, String prefixFieldName, String fieldName, Field.FieldType fieldType, int index);
+    }
+
+    public static void main(String[] args) throws IOException {
+        if (args.length != 1) {
+            System.err.println("Requires json path: ScanJson <json_path>");
+            return;
+        }
+
+        String jsonPath = args[0];
+
+        System.out.println("Reading json input...");
+        ScanJson sj = new ScanJson(jsonPath);
+        ScanJsonIterator iterator = (ScanJsonIterator) sj.getIterator();
+        long count = 0;
+
+        while (iterator.next() != PhysicalOperatorIterator.NextOutcome.NONE_LEFT) {
+            Record record = iterator.getRecord();
+            System.out.println("Record " + ++count);
+            System.out.println("Schema: ");
+            System.out.println(iterator.getCurrentSchema().toSchemaString());
+            System.out.println();
+            System.out.println("Changes since last record: ");
+            System.out.println();
+            System.out.println(record.getSchemaChanges());
+            System.out.println();
+        }
+    }
+}
+
+
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java
new file mode 100644
index 0000000..a81a9d9
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/ProtobufSchemaTransformer.java
@@ -0,0 +1,109 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.transform;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
+import com.google.protobuf.DescriptorProtos;
+
+import org.apache.drill.exec.schema.Field;
+import org.apache.drill.exec.schema.ListSchema;
+import org.apache.drill.exec.schema.ObjectSchema;
+import org.apache.drill.exec.schema.RecordSchema;
+
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ProtobufSchemaTransformer implements SchemaTransformer<DescriptorProtos.DescriptorProto> {
+    private static final Map<Field.FieldType, Function<Field, Object>> FIELD_MAP = Maps.newEnumMap(Field.FieldType.class);
+    private static final Map<Field.FieldType, DescriptorProtos.FieldDescriptorProto.Type> TYPE_MAP = Maps.newEnumMap(Field.FieldType.class);
+    private int fieldIndex = 0;
+    public static final String LIST_SCHEMA_NAME = "_EmbeddedList"; //Read from config?
+
+    static {
+        TYPE_MAP.put(Field.FieldType.BOOLEAN, DescriptorProtos.FieldDescriptorProto.Type.TYPE_BOOL);
+        TYPE_MAP.put(Field.FieldType.STRING, DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING);
+        TYPE_MAP.put(Field.FieldType.FLOAT, DescriptorProtos.FieldDescriptorProto.Type.TYPE_FLOAT);
+        TYPE_MAP.put(Field.FieldType.INTEGER, DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT32);
+    }
+
+    private DescriptorProtos.DescriptorProto.Builder transformSchema(String name, DescriptorProtos.DescriptorProto.Builder parentBuilder, RecordSchema schema) {
+        if (schema instanceof ObjectSchema) {
+            return addObjectSchema(name, parentBuilder, ObjectSchema.class.cast(schema));
+        } else if (schema instanceof ListSchema) {
+            return addListSchema(name, ListSchema.class.cast(schema));
+        } else {
+            throw new RuntimeException("Unknown schema passed to transformer: " + schema);
+        }
+    }
+
+    public DescriptorProtos.DescriptorProto transformSchema(String name, RecordSchema schema) {
+        return transformSchema(name, null, schema).build();
+    }
+
+    private DescriptorProtos.DescriptorProto.Builder addListSchema(String name, ListSchema schema) {
+        DescriptorProtos.DescriptorProto.Builder builder = DescriptorProtos.DescriptorProto.newBuilder().setName(name);
+        DescriptorProtos.FieldDescriptorProto.Builder builderForValue = DescriptorProtos.FieldDescriptorProto.newBuilder();
+        builderForValue.setTypeName(LIST_SCHEMA_NAME).setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE);
+        builder.addField(builderForValue);
+        return builder;
+    }
+
+    private DescriptorProtos.DescriptorProto.Builder addObjectSchema(String name,
+                                                                     DescriptorProtos.DescriptorProto.Builder parentBuilder,
+                                                                     ObjectSchema schema) {
+        DescriptorProtos.DescriptorProto.Builder builder = DescriptorProtos.DescriptorProto.newBuilder().setName(name);
+        for (Field field : schema.getFields()) {
+            DescriptorProtos.FieldDescriptorProto.Builder builderForValue = DescriptorProtos.FieldDescriptorProto.newBuilder();
+            String fieldName = field.getFieldName();
+            builderForValue.setName(fieldName).setLabel(DescriptorProtos.FieldDescriptorProto.Label.LABEL_OPTIONAL).setNumber(++fieldIndex);
+            if (field.hasSchema()) {
+                RecordSchema innerSchema = field.getAssignedSchema();
+                if (innerSchema instanceof ObjectSchema) {
+                    addObjectSchema(fieldName, builder, (ObjectSchema) innerSchema);
+                    DescriptorProtos.DescriptorProto innerProto = Iterables.getLast(builder.getNestedTypeList());
+                    builderForValue.setTypeName(innerProto.getName()).setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE);
+                } else if (innerSchema instanceof ListSchema) {
+                    builderForValue.setTypeName(LIST_SCHEMA_NAME).setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE);
+                }
+            } else {
+                builderForValue.setType(getProtoType(field.getFieldType()));
+            }
+            builder.addField(builderForValue);
+        }
+
+        if (parentBuilder != null) {
+            parentBuilder.addNestedType(builder);
+        }
+
+        return builder;
+    }
+
+    private DescriptorProtos.FieldDescriptorProto.Type getProtoType(Field.FieldType fieldType) {
+        return checkNotNull(TYPE_MAP.get(fieldType));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java
new file mode 100644
index 0000000..54d851f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/schema/transform/SchemaTransformer.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+
+package org.apache.drill.exec.schema.transform;
+
+import org.apache.drill.exec.schema.Field;
+import org.apache.drill.exec.schema.ListSchema;
+import org.apache.drill.exec.schema.ObjectSchema;
+import org.apache.drill.exec.schema.RecordSchema;
+
+import java.util.List;
+
+public interface SchemaTransformer<T> {
+    public T transformSchema(String name, RecordSchema schema);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
new file mode 100644
index 0000000..6cc35e2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import java.net.InetAddress;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.HazelCache;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.service.ServiceEngine;
+
+import com.google.common.io.Closeables;
+
+public class Drillbit {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
+
+  public static void main(String[] cli) throws DrillbitStartupException, InterruptedException {
+    Drillbit bit = null;
+    try {
+      logger.debug("Setting up Drillbit.");
+      StartupOptions options = StartupOptions.parse(cli);
+      DrillConfig config = DrillConfig.create(options.getConfigLocation());
+      bit = new Drillbit(config);
+    } catch (Exception ex) {
+      throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
+    }
+
+    
+    try {
+      logger.debug("Starting Drillbit.");
+      bit.run();
+    } catch (Exception e) {
+      throw new DrillbitStartupException("Failure during initial startup of Drillbit.", e);
+    }
+    Thread.sleep(10000);
+    // at this point, the main thread can terminate as we have started all our working threads.
+  }
+
+  private final DrillbitContext context;
+  final BufferAllocator pool;
+  final ClusterCoordinator coord;
+  final ServiceEngine engine;
+  final DistributedCache cache;
+  private RegistrationHandle handle;
+
+  public Drillbit(DrillConfig config) throws Exception {
+    final DrillbitContext context = new DrillbitContext(config, this);
+    Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
+    this.context = context;
+    this.pool = BufferAllocator.getAllocator(context);
+    this.coord = new ZKClusterCoordinator(config);
+    this.engine = new ServiceEngine(context);
+    this.cache = new HazelCache(context.getConfig());
+  }
+
+  public void run() throws Exception {
+    coord.start();
+    engine.start();
+    
+    DrillbitEndpoint md = DrillbitEndpoint.newBuilder().setAddress(InetAddress.getLocalHost().getHostAddress())
+        .setBitPort(engine.getBitPort()).setUserPort(engine.getUserPort()).build();
+    handle = coord.register(md);
+    cache.run(md);
+  }
+
+  public void close() {
+    if (coord != null) coord.unregister(handle);
+
+    try {
+      Thread.sleep(context.getConfig().getInt(ExecConstants.ZK_REFRESH) * 2);
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted while sleeping during coordination deregistration.");
+    }
+
+    Closeables.closeQuietly(engine);
+    Closeables.closeQuietly(coord);
+    Closeables.closeQuietly(pool);
+    logger.info("Shutdown completed.");
+  }
+
+  private class ShutdownThread extends Thread {
+    ShutdownThread(DrillConfig config) {
+      this.setName("ShutdownHook");
+    }
+
+    @Override
+    public void run() {
+      logger.info("Received shutdown request.");
+      close();
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
new file mode 100644
index 0000000..94c8207
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -0,0 +1,65 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.server;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.BufferAllocator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.drill.exec.rpc.bit.BitCom;
+
+public class DrillbitContext {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
+  
+  private final DrillConfig config;
+  private final Drillbit underlyingBit;
+  private final NioEventLoopGroup loop;
+
+  public DrillbitContext(DrillConfig config, Drillbit underlyingBit) {
+    super();
+    this.config = config;
+    this.underlyingBit = underlyingBit;
+    this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
+  }
+  
+  public DrillConfig getConfig() {
+    return config;
+  }
+  
+  public List<DrillbitEndpoint> getBits(){
+    return underlyingBit.coord.getAvailableEndpoints();
+  }
+
+  public BufferAllocator getAllocator(){
+    return underlyingBit.pool;
+  }
+  
+  
+  public NioEventLoopGroup getBitLoopGroup(){
+    return loop;
+  }
+  
+  public BitCom getBitCom(){
+    return underlyingBit.engine.getBitCom();
+  }
+  
+}