You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/04/14 04:35:12 UTC

[8/9] basic framework for physical plan. abstraction of graph classes.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java b/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java
new file mode 100644
index 0000000..41cba45
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/rse/ReferenceStorageEngine.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ref.rse;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.exec.ref.rops.ROP;
+
+
+public interface ReferenceStorageEngine {
+  public boolean supportsRead();
+  public boolean supportsWrite();
+
+  public enum PartitionCapabilities {
+    NONE, HASH, SORTED;
+  }
+
+  public enum MemoryFormat {
+    RECORD, FIELD;
+  }
+
+  public Collection<ReadEntry> getReadEntries(Scan scan) throws IOException;
+  public RecordReader getReader(ReadEntry readEntry, ROP parentROP) throws IOException;
+  public RecordRecorder getWriter(Store store) throws IOException;
+
+  public interface ReadEntry{}
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/BufferAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/BufferAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/BufferAllocator.java
new file mode 100644
index 0000000..a398607
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/BufferAllocator.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+
+import java.io.Closeable;
+
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Wrapper class to deal with byte buffer allocation. Ensures users only use designated methods.  Also allows inser 
+ */
+public abstract class BufferAllocator implements Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferAllocator.class);
+  
+  /**
+   * Allocate a new or reused buffer of the provided size.  Note that the buffer may technically be larger than the requested size for rounding purposes.  However, the buffers capacity will be set to the configured size.
+   * @param size The size in bytes.
+   * @return A new ByteBuf.
+   */
+  public abstract ByteBuf buffer(int size);
+  
+  public abstract ByteBufAllocator getUnderlyingAllocator();
+  
+  /**
+   * Close and release all buffers generated from this buffer pool.
+   */
+  @Override
+  public abstract void close(); 
+  
+  public static BufferAllocator getAllocator(DrillbitContext context){
+    // TODO: support alternative allocators (including a debugging allocator that records all allocation locations for each buffer).
+    return new DirectBufferAllocator();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/DirectBufferAllocator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/DirectBufferAllocator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/DirectBufferAllocator.java
new file mode 100644
index 0000000..8c81dd6
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/DirectBufferAllocator.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+
+public class DirectBufferAllocator extends BufferAllocator{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBufferAllocator.class);
+
+  private final PooledByteBufAllocator buffer = new PooledByteBufAllocator(true);
+  
+  @Override
+  public ByteBuf buffer(int size) {
+    return buffer.directBuffer(size);
+  }
+
+  
+  @Override
+  public ByteBufAllocator getUnderlyingAllocator() {
+    return buffer;
+  }
+
+
+  @Override
+  public void close() {
+    // TODO: collect all buffers and release them away using a weak hashmap so we don't impact pool work
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
new file mode 100644
index 0000000..d81870b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -0,0 +1,31 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec;
+
+public interface ExecConstants {
+  public static final String ZK_RETRY_TIMES = "drill.exec.zk.retry.count";
+  public static final String ZK_RETRY_DELAY = "drill.exec.zk.retry.delay";
+  public static final String ZK_CONNECTION = "drill.exec.zk.connect";
+  public static final String ZK_TIMEOUT = "drill.exec.zk.timeout";
+  public static final String ZK_ROOT = "drill.exec.zk.root";
+  public static final String ZK_REFRESH = "drill.exec.zk.refresh";
+  public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages";
+  public static final String SERVICE_NAME = "drill.exec.cluster-id";
+  public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.port";
+  public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.port";
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
new file mode 100644
index 0000000..1684960
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -0,0 +1,38 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.WorkQueueStatus;
+
+
+public interface DistributedCache extends Closeable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
+  
+  public void run(DrillbitEndpoint endpoint) throws DrillbitStartupException;
+  
+  public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical);
+  public TemplatizedPhysicalPlan getOptimizedPlan(TemplatizedLogicalPlan logical);
+  
+  public void updateLocalQueueLength(int length);
+  public List<WorkQueueStatus> getQueueLengths(); 
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
new file mode 100644
index 0000000..cc73799
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -0,0 +1,133 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.WorkQueueStatus;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.hazelcast.config.Config;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+import com.hazelcast.nio.DataSerializable;
+
+public class HazelCache implements DistributedCache {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
+
+  private final String instanceName;
+  private HazelcastInstance instance;
+  private ITopic<WrappedWorkQueueStatus> workQueueLengths;
+  private DrillbitEndpoint endpoint;
+  private Cache<WorkQueueStatus, Integer>  endpoints;
+  private IMap<TemplatizedLogicalPlan, TemplatizedPhysicalPlan> optimizedPlans;
+  
+  public HazelCache(DrillConfig config) {
+    this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+  }
+
+  private class Listener implements MessageListener<WrappedWorkQueueStatus>{
+
+    @Override
+    public void onMessage(Message<WrappedWorkQueueStatus> wrapped) {
+      logger.debug("Received new queue length message.");
+      endpoints.put(wrapped.getMessageObject().status, 0);
+    }
+    
+  }
+  
+  public void run(DrillbitEndpoint endpoint) {
+    Config c = new Config();
+    // todo, utilize cluster member ship to set up other nodes.
+    c.setInstanceName(instanceName);
+    instance = Hazelcast.newHazelcastInstance(c);
+    workQueueLengths = instance.getTopic("queue-length");
+    optimizedPlans = instance.getMap("plan-optimizations");
+    this.endpoint = endpoint;
+    endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
+    workQueueLengths.addMessageListener(new Listener());
+  }
+
+  @Override
+  public void saveOptimizedPlan(TemplatizedLogicalPlan logical, TemplatizedPhysicalPlan physical) {
+    optimizedPlans.put(logical, physical);
+  }
+
+  @Override
+  public TemplatizedPhysicalPlan getOptimizedPlan(TemplatizedLogicalPlan logical) {
+    return optimizedPlans.get(logical);
+  }
+
+  @Override
+  public void updateLocalQueueLength(int length) {
+    workQueueLengths.publish(new WrappedWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
+        .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
+  }
+
+  @Override
+  public List<WorkQueueStatus> getQueueLengths() {
+    return Lists.newArrayList(endpoints.asMap().keySet());
+  }
+
+  public class WrappedWorkQueueStatus implements DataSerializable {
+
+    public WorkQueueStatus status;
+
+    public WrappedWorkQueueStatus(WorkQueueStatus status) {
+      this.status = status;
+    }
+
+    @Override
+    public void readData(DataInput arg0) throws IOException {
+      int len = arg0.readShort();
+      byte[] b = new byte[len];
+      arg0.readFully(b);
+      this.status = WorkQueueStatus.parseFrom(b);
+    }
+
+    @Override
+    public void writeData(DataOutput arg0) throws IOException {
+      byte[] b = status.toByteArray();
+      if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
+      arg0.writeShort(b.length);
+      arg0.write(b);
+    }
+
+  }
+
+  @Override
+  public void close() throws IOException {
+    this.instance.getLifecycleService().shutdown();
+  }
+  
+
+  
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
new file mode 100644
index 0000000..5ad9ef1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedLogicalPlan.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+public class TemplatizedLogicalPlan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TemplatizedLogicalPlan.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
new file mode 100644
index 0000000..643720c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/cache/TemplatizedPhysicalPlan.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.cache;
+
+public class TemplatizedPhysicalPlan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TemplatizedPhysicalPlan.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
new file mode 100644
index 0000000..9c7eab2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ClusterCoordinator.java
@@ -0,0 +1,47 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+/**
+ * Pluggable interface built to manage cluster coordination. Allows Drillbit or DrillClient to register its capabilities
+ * as well as understand other node's existence and capabilities.
+ **/
+public abstract class ClusterCoordinator implements Closeable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ClusterCoordinator.class);
+
+  public abstract void start() throws Exception;
+
+  public abstract RegistrationHandle register(DrillbitEndpoint data);
+
+  public abstract void unregister(RegistrationHandle handle);
+
+  /**
+   * Get a list of avialable Drillbit endpoints.  Thread-safe.  Could be slightly out of date depending on refresh policy.
+   * @return A list of available endpoints.
+   */
+  public abstract List<DrillbitEndpoint> getAvailableEndpoints();
+
+  public interface RegistrationHandle {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
new file mode 100644
index 0000000..289aa3c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/DrillServiceInstanceHelper.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillServiceInstance;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.netflix.curator.x.discovery.ServiceInstance;
+import com.netflix.curator.x.discovery.ServiceInstanceBuilder;
+import com.netflix.curator.x.discovery.details.InstanceSerializer;
+
+public class DrillServiceInstanceHelper {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillServiceInstanceHelper.class);
+
+  
+  public static final InstanceSerializer<DrillbitEndpoint> SERIALIZER = new DrillServiceInstanceSerializer();
+  
+  private static class DrillServiceInstanceSerializer implements InstanceSerializer<DrillbitEndpoint>{
+
+    @Override
+    public byte[] serialize(ServiceInstance<DrillbitEndpoint> i) throws Exception {
+      DrillServiceInstance.Builder b = DrillServiceInstance.newBuilder();
+      b.setId(i.getId());
+      b.setRegistrationTimeUTC(i.getRegistrationTimeUTC());
+      b.setEndpoint(i.getPayload());
+      return b.build().toByteArray();
+    }
+
+    @Override
+    public ServiceInstance<DrillbitEndpoint> deserialize(byte[] bytes) throws Exception {
+      DrillServiceInstance i = DrillServiceInstance.parseFrom(bytes);
+      ServiceInstanceBuilder<DrillbitEndpoint> b = ServiceInstance.<DrillbitEndpoint>builder();
+      b.id(i.getId());
+      b.name(ExecConstants.SERVICE_NAME);
+      b.registrationTimeUTC(i.getRegistrationTimeUTC());
+      b.payload(i.getEndpoint());
+      return b.build();
+    }
+    
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
new file mode 100644
index 0000000..b3cd27f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKClusterCoordinator.java
@@ -0,0 +1,145 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import com.google.common.base.Throwables;
+import com.netflix.curator.RetryPolicy;
+import com.netflix.curator.framework.CuratorFramework;
+import com.netflix.curator.framework.CuratorFrameworkFactory;
+import com.netflix.curator.framework.state.ConnectionState;
+import com.netflix.curator.retry.RetryNTimes;
+import com.netflix.curator.x.discovery.ServiceDiscovery;
+import com.netflix.curator.x.discovery.ServiceDiscoveryBuilder;
+import com.netflix.curator.x.discovery.ServiceInstance;
+import com.netflix.curator.x.discovery.details.ServiceCache;
+import com.netflix.curator.x.discovery.details.ServiceCacheListener;
+
+/** Manages cluster coordination utilizing zookeeper. **/
+public class ZKClusterCoordinator extends ClusterCoordinator {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKClusterCoordinator.class);
+
+  private String basePath;
+  private CuratorFramework curator;
+  private ServiceDiscovery<DrillbitEndpoint> discovery;
+  private ServiceCache<DrillbitEndpoint> serviceCache;
+  private volatile List<DrillbitEndpoint> endpoints = Collections.emptyList();
+  private final String serviceName;
+  public ZKClusterCoordinator(DrillConfig config) throws IOException {
+    
+    this.basePath = config.getString(ExecConstants.ZK_ROOT);
+    this.serviceName =  config.getString(ExecConstants.SERVICE_NAME);
+    
+    RetryPolicy rp = new RetryNTimes(config.getInt(ExecConstants.ZK_RETRY_TIMES),
+        config.getInt(ExecConstants.ZK_RETRY_DELAY));
+    
+    curator = CuratorFrameworkFactory.builder()
+        .connectionTimeoutMs(config.getInt(ExecConstants.ZK_TIMEOUT))
+        .retryPolicy(rp)
+        .connectString(config.getString(ExecConstants.ZK_CONNECTION))
+        .build(); 
+    
+    discovery = getDiscovery();
+    serviceCache = discovery.serviceCacheBuilder().name(serviceName).refreshPaddingMs(config.getInt(ExecConstants.ZK_REFRESH)).build();
+  }
+
+  public void start() throws Exception {
+    logger.debug("Starting ZKClusterCoordination.");
+    curator.start();
+    discovery.start();
+    serviceCache.start();
+    serviceCache.addListener(new ZKListener());
+  }
+  
+  private class ZKListener implements ServiceCacheListener{
+    
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+    }
+
+    @Override
+    public void cacheChanged() {
+      logger.debug("Cache changed, updating.");
+      try {
+        Collection<ServiceInstance<DrillbitEndpoint>> instances = discovery.queryForInstances(serviceName);
+        List<DrillbitEndpoint> newEndpoints = new ArrayList<DrillbitEndpoint>(instances.size());
+        for(ServiceInstance<DrillbitEndpoint> si : instances){
+          newEndpoints.add(si.getPayload());
+        }
+        endpoints = newEndpoints;
+      } catch (Exception e) {
+        logger.error("Failure while update Drillbit service location cache.", e);
+      }
+    }
+  }
+
+  public void close() throws IOException{
+    serviceCache.close();
+    discovery.close();
+    curator.close();
+  }
+  
+  @Override
+  public RegistrationHandle register(DrillbitEndpoint data) {
+    try {
+      ServiceInstance<DrillbitEndpoint> si = getSI(data);
+      discovery.registerService(si);
+      return new ZKRegistrationHandle(si.getId());
+    } catch (Exception e) {
+      Throwables.propagate(e);
+      return null;
+    }
+  }
+
+  @Override
+  public void unregister(RegistrationHandle handle) {
+    if( !( handle instanceof ZKRegistrationHandle)) throw new UnsupportedOperationException("Unknown handle type");
+    
+    ZKRegistrationHandle h = (ZKRegistrationHandle) handle;
+    try {
+      ServiceInstance<DrillbitEndpoint> si = ServiceInstance.<DrillbitEndpoint>builder().address("").port(0).id(h.id).name(ExecConstants.SERVICE_NAME).build();
+      discovery.unregisterService(si);
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public List<DrillbitEndpoint> getAvailableEndpoints() {
+    return this.endpoints;
+  }
+  
+  private ServiceInstance<DrillbitEndpoint> getSI(DrillbitEndpoint ep) throws Exception{
+    return ServiceInstance.<DrillbitEndpoint>builder().name(ExecConstants.SERVICE_NAME).payload(ep).build();
+  }
+  
+  
+
+  public ServiceDiscovery<DrillbitEndpoint> getDiscovery() {
+    return ServiceDiscoveryBuilder.builder(DrillbitEndpoint.class).basePath(basePath).client(curator).serializer(DrillServiceInstanceHelper.SERIALIZER).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKRegistrationHandle.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKRegistrationHandle.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKRegistrationHandle.java
new file mode 100644
index 0000000..b8a7648
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/coord/ZKRegistrationHandle.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.coord;
+
+import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
+
+public class ZKRegistrationHandle implements RegistrationHandle {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZKRegistrationHandle.class);
+  
+  public final String id;
+
+  public ZKRegistrationHandle(String id) {
+    super();
+    this.id = id;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/disk/Spool.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/disk/Spool.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/disk/Spool.java
new file mode 100644
index 0000000..346b531
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/disk/Spool.java
@@ -0,0 +1,29 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.disk;
+
+import java.io.IOException;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface Spool {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Spool.class);
+  
+  public void write(RecordBatch batch) throws IOException;
+  public void read(RecordBatch batch) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/DrillbitStartupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/DrillbitStartupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/DrillbitStartupException.java
new file mode 100644
index 0000000..3c36171
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/DrillbitStartupException.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class DrillbitStartupException extends DrillException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStartupException.class);
+
+  public DrillbitStartupException() {
+    super();
+  }
+
+  public DrillbitStartupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public DrillbitStartupException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public DrillbitStartupException(String message) {
+    super(message);
+  }
+
+  public DrillbitStartupException(Throwable cause) {
+    super(cause);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
new file mode 100644
index 0000000..a4899bd
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/ExecutionSetupException.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class ExecutionSetupException extends DrillException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecutionSetupException.class);
+  
+  public ExecutionSetupException() {
+    super();
+  }
+
+  public ExecutionSetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public ExecutionSetupException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public ExecutionSetupException(String message) {
+    super(message);
+  }
+
+  public ExecutionSetupException(Throwable cause) {
+    super(cause);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeException.java
new file mode 100644
index 0000000..24883aa
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SchemaChangeException.java
@@ -0,0 +1,52 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class SchemaChangeException extends DrillException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SchemaChangeException.class);
+
+  public SchemaChangeException() {
+    super();
+  }
+
+  public SchemaChangeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public SchemaChangeException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SchemaChangeException(String message) {
+    super(message);
+  }
+
+  public SchemaChangeException(Throwable cause) {
+    super(cause);
+  }
+  
+  public SchemaChangeException(String message, Object...objects){
+    super(String.format(message, objects));
+  }
+
+  public SchemaChangeException(String message, Throwable cause, Object...objects){
+    super(String.format(message, objects), cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SetupException.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SetupException.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SetupException.java
new file mode 100644
index 0000000..f249f13
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/exception/SetupException.java
@@ -0,0 +1,46 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.exception;
+
+import org.apache.drill.common.exceptions.DrillException;
+
+public class SetupException extends DrillException{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetupException.class);
+
+  public SetupException() {
+    super();
+  }
+
+  public SetupException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+    super(message, cause, enableSuppression, writableStackTrace);
+  }
+
+  public SetupException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public SetupException(String message) {
+    super(message);
+  }
+
+  public SetupException(Throwable cause) {
+    super(cause);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BatchIterator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BatchIterator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BatchIterator.java
new file mode 100644
index 0000000..2ebbef5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BatchIterator.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+import parquet.schema.MessageType;
+
+public interface BatchIterator {
+  static enum IterOutcome{NONE, FULL_NEW_SCHEMA, FULL, PARTIAL_NEW_SCHEMA, PARTIAL, STOP}
+  public RecordBatch getBatch();
+  public FragmentContext getContext();
+  public MessageType getSchema();
+  public void kill(QueryOutcome outcome);
+  public IterOutcome next();
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
new file mode 100644
index 0000000..be1081f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -0,0 +1,49 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.rpc.bit.BitCom;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.StorageEngine;
+
+public class FragmentContext {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
+
+  private final DrillbitContext context;
+  
+  public FragmentContext(DrillbitContext context) {
+    this.context = context;
+  }
+
+  public void fail(Throwable cause) {
+
+  }
+
+  public DrillbitContext getDrillbitContext(){
+    return context;
+  }
+  
+  public StorageEngine getStorageEngine(StorageEngineConfig config){
+    return null;
+  }
+  
+  public BitCom getCommunicator(){
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
new file mode 100644
index 0000000..59abdc4
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OutputMutator.java
@@ -0,0 +1,28 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+public interface OutputMutator {
+  public void removeField(int fieldId) throws SchemaChangeException;
+  public void addField(int fieldId, ValueVector<?> vector) throws SchemaChangeException ;
+  public void setNewSchema(BatchSchema schema) throws SchemaChangeException ;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryOutcome.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryOutcome.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryOutcome.java
new file mode 100644
index 0000000..b737f7c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryOutcome.java
@@ -0,0 +1,22 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+public class QueryOutcome {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryOutcome.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
new file mode 100644
index 0000000..88b8af2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/ScanBatch.java
@@ -0,0 +1,157 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops;
+
+import java.util.Iterator;
+
+import org.apache.drill.exec.exception.ExecutionSetupException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.InvalidValueAccessor;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.vector.ValueVector;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.carrotsearch.hppc.procedures.IntObjectProcedure;
+
+/**
+ * Record batch used for a particular scan. Operators against one or more
+ */
+public class ScanBatch implements RecordBatch {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+
+  private IntObjectOpenHashMap<ValueVector<?>> fields = new IntObjectOpenHashMap<ValueVector<?>>();
+  private BatchSchema schema;
+  private int recordCount;
+  private boolean schemaChanged = true;
+  private final FragmentContext context;
+  private Iterator<RecordReader> readers;
+  private RecordReader currentReader;
+  private final BatchSchema expectedSchema;
+  private final Mutator mutator = new Mutator();
+
+  public ScanBatch(BatchSchema expectedSchema, Iterator<RecordReader> readers, FragmentContext context)
+      throws ExecutionSetupException {
+    this.expectedSchema = expectedSchema;
+    this.context = context;
+    this.readers = readers;
+    if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader.");
+    this.currentReader = readers.next();
+    this.currentReader.setup(expectedSchema, mutator);
+  }
+
+  private void schemaChanged() {
+    schema = null;
+    schemaChanged = true;
+  }
+
+  @Override
+  public FragmentContext getContext() {
+    return context;
+  }
+
+  @Override
+  public BatchSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public int getRecordCount() {
+    return recordCount;
+  }
+
+  @Override
+  public void kill() {
+    releaseAssets();
+  }
+
+  private void releaseAssets() {
+    fields.forEach(new IntObjectProcedure<ValueVector<?>>() {
+      @Override
+      public void apply(int key, ValueVector<?> value) {
+        value.close();
+      }
+    });
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <T extends ValueVector<T>> T getValueVector(int fieldId, Class<T> clazz) throws InvalidValueAccessor {
+    if (fields.containsKey(fieldId))
+      throw new InvalidValueAccessor(String.format("Unknown value accesor for field id %d."));
+    ValueVector<?> vector = this.fields.lget();
+    if (vector.getClass().isAssignableFrom(clazz)) {
+      return (T) vector;
+    } else {
+      throw new InvalidValueAccessor(String.format(
+          "You requested a field accessor of type %s for field id %d but the actual type was %s.",
+          clazz.getCanonicalName(), fieldId, vector.getClass().getCanonicalName()));
+    }
+  }
+
+  @Override
+  public IterOutcome next() {
+    while ((recordCount = currentReader.next()) == 0) {
+      try {
+        if (!readers.hasNext()) {
+          currentReader.cleanup();
+          releaseAssets();
+          return IterOutcome.NONE;
+        }
+        currentReader.cleanup();
+        currentReader = readers.next();
+        currentReader.setup(expectedSchema, mutator);
+      } catch (ExecutionSetupException e) {
+        this.context.fail(e);
+        releaseAssets();
+        return IterOutcome.STOP;
+      }
+    }
+
+    if (schemaChanged) {
+      schemaChanged = false;
+      return IterOutcome.OK_NEW_SCHEMA;
+    } else {
+      return IterOutcome.OK;
+    }
+  }
+
+  private class Mutator implements OutputMutator {
+
+    public void removeField(int fieldId) throws SchemaChangeException {
+      schemaChanged();
+      ValueVector<?> v = fields.remove(fieldId);
+      if (v == null) throw new SchemaChangeException("Failure attempting to remove an unknown field.");
+      v.close();
+    }
+
+    public void addField(int fieldId, ValueVector<?> vector) {
+      schemaChanged();
+      ValueVector<?> v = fields.put(fieldId, vector);
+      if (v != null) v.close();
+    }
+
+    @Override
+    public void setNewSchema(BatchSchema schema) {
+      ScanBatch.this.schema = schema;
+      ScanBatch.this.schemaChanged = true;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
new file mode 100644
index 0000000..6640ef2
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/PartitioningSender.java
@@ -0,0 +1,23 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.exchange;
+
+
+public class PartitioningSender {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitioningSender.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
new file mode 100644
index 0000000..c9f8147
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RandomReceiver.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.exchange;
+
+public class RandomReceiver {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomReceiver.class);
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
new file mode 100644
index 0000000..0e35932
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ops/exchange/RecordBatchSender.java
@@ -0,0 +1,24 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.ops.exchange;
+
+public class RecordBatchSender {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RecordBatchSender.class);
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
new file mode 100644
index 0000000..70a42be
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
@@ -0,0 +1,40 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.opt;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.common.optimize.Optimizer;
+import org.apache.drill.common.physical.PhysicalPlan;
+
+public class IdentityOptimizer extends Optimizer {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);
+
+  @Override
+  public void init(DrillConfig config) {
+  }
+
+  @Override
+  public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
+    return null;
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
new file mode 100644
index 0000000..9554bf3
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ExecPlanner.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.planner;
+
+
+/**
+ * Decides level of paralellization.
+ * Generates smaller physical plans
+ */
+public class ExecPlanner {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExecPlanner.class);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b53933f2/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
new file mode 100644
index 0000000..1d32340
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -0,0 +1,123 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.types.DataType;
+import org.apache.drill.common.physical.RecordField.ValueMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.vector.ValueVector;
+
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.collect.Lists;
+
+public class BatchSchema implements Iterable<MaterializedField>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchSchema.class);
+  
+  private List<MaterializedField> fields = Lists.newArrayList();
+  
+  private BatchSchema(List<MaterializedField> fields) {
+    this.fields = fields;
+  }
+
+  @Override
+  public Iterator<MaterializedField> iterator() {
+    return fields.iterator();
+  }
+
+  public void addAnyField(short fieldId, boolean nullable, ValueMode mode){
+    addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
+  }
+  
+  public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass){
+    fields.add(new MaterializedField(fieldId, type, nullable, mode, valueClass));
+  }
+  
+  
+  /**
+   * Builder to build BatchSchema.  Can have a supporting expected object.  If the expected Schema object is defined, the builder will always check that this schema is a equal or more materialized version of the current schema.
+   */
+  public class BatchSchemaBuilder{
+    private IntObjectOpenHashMap<MaterializedField> fields = new IntObjectOpenHashMap<MaterializedField>();
+    private IntObjectOpenHashMap<MaterializedField> expectedFields = new IntObjectOpenHashMap<MaterializedField>();
+    
+    public BatchSchemaBuilder(BatchSchema expected){
+      for(MaterializedField f: expected){
+        expectedFields.put(f.getFieldId(), f);
+      }
+    }
+    
+    public BatchSchemaBuilder(){
+    }
+    
+    
+    /**
+     * Add a field where we don't have type information.  In this case, DataType will be set to LATEBIND and valueClass will be set to null.
+     * @param fieldId The desired fieldId.  Should be unique for this BatchSchema.
+     * @param nullable Whether this field supports nullability.
+     * @param mode
+     * @throws SchemaChangeException
+     */
+    public void addLateBindField(short fieldId, boolean nullable, ValueMode mode) throws SchemaChangeException{
+      addTypedField(fieldId, DataType.LATEBIND, nullable, mode, Void.class);
+    }
+    
+    private void setTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
+      MaterializedField f = new MaterializedField(fieldId, type, nullable, mode, valueClass);
+      if(expectedFields != null){
+        if(!expectedFields.containsKey(f.getFieldId())) throw new SchemaChangeException(String.format("You attempted to add a field for Id An attempt was made to add a duplicate fieldId to the schema.  The offending fieldId was %d", fieldId));
+        f.checkMaterialization(expectedFields.lget());
+      }
+      fields.put(f.getFieldId(), f);
+    }
+    
+    public void addTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
+      if(fields.containsKey(fieldId)) throw new SchemaChangeException(String.format("An attempt was made to add a duplicate fieldId to the schema.  The offending fieldId was %d", fieldId));
+      setTypedField(fieldId, type, nullable, mode, valueClass);
+    }
+    
+    public void replaceTypedField(short fieldId, DataType type, boolean nullable, ValueMode mode, Class<?> valueClass) throws SchemaChangeException{
+      if(!fields.containsKey(fieldId)) throw new SchemaChangeException(String.format("An attempt was made to replace a field in the schema, however the schema does not currently contain that field id.  The offending fieldId was %d", fieldId));
+      setTypedField(fieldId, type, nullable, mode, valueClass);
+    }
+    
+    public void addVector(ValueVector<?> v){
+      
+    }
+    
+    public void replaceVector(ValueVector<?> oldVector, ValueVector<?> newVector){
+      
+    }
+    
+    
+    public BatchSchema buildAndClear() throws SchemaChangeException{
+      // check if any fields are unaccounted for.
+      
+      List<MaterializedField> fieldList = Lists.newArrayList();
+      for(MaterializedField f : fields.values){
+        if(f != null) fieldList.add(f);
+      }
+      Collections.sort(fieldList);
+      return new BatchSchema(fieldList);
+    }
+  }
+  
+}