You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/05/14 03:52:46 UTC

[06/13] Update typing system. Update RPC system. Add Fragmenting Implementation. Working single node. Distributed failing due to threading issues.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java
deleted file mode 100644
index 83ad599..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/BitVector.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-/**
- * Describes a vector which holds a number of true/false values.
- */
-public class BitVector extends AbstractFixedValueVector<BitVector> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitVector.class);
-
-  private final MaterializedField field;
-  
-  public BitVector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator, 1);
-    this.field = new MaterializedField(fieldId, DataType.BOOLEAN, false, ValueMode.VECTOR, this.getClass());
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
-  }
-  
-//  /** Returns true or false for the specified bit index.
-//   * The index should be less than the OpenBitSet size
-//   */
-//  public boolean get(int index) {
-//    assert index >= 0 && index < this.valueCount;
-//    int i = index >> 3;               // div 8
-//    // signed shift will keep a negative index and force an
-//    // array-index-out-of-bounds-exception, removing the need for an explicit check.
-//    int bit = index & 0x3f;           // mod 64
-//    long bitmask = 1L << bit;
-//    return (data.getLong(i) & bitmask) != 0;
-//  }
-  
-  public int getBit(int index) {
-    
-    assert index >= 0 && index < this.valueCount;
-    int i = 8*(index >> 6); // div 8
-    int bit = index & 0x3f; // mod 64
-    return ((int) (data.getLong(i) >>> bit)) & 0x01;
-  }
-  
-  /** Sets the bit at the specified index.
-   * The index should be less than the OpenBitSet size.
-   */
-   public void set(int index) {
-     assert index >= 0 && index < this.valueCount;
-     int wordNum = index >> 3;   
-     int bit = index & 0x3f;
-     long bitmask = 1L << bit;
-     data.setLong(wordNum, data.getLong(wordNum) | bitmask);
-   }
-   
-   public void clear(int index) {
-     assert index >= 0 && index < this.valueCount;
-     int wordNum = index >> 3;
-     int bit = index & 0x03f;
-     long bitmask = 1L << bit;
-     data.setLong(wordNum, data.getLong(wordNum) & ~bitmask);
-   }
-   
-   
-   
-   /** Clears a range of bits.  Clearing past the end does not change the size of the set.
-   *
-   * @param startBitIndex lower index
-   * @param lastBitIndex one-past the last bit to clear
-   */
-  private void clear2(int startBitIndex, int lastBitIndex) {
-    if (lastBitIndex <= startBitIndex) return;
-
-    int firstWordStart = (startBitIndex>>3);
-    if (firstWordStart >= this.longWords) return;
-
-    // since endIndex is one past the end, this is index of the last
-    // word to be changed.
-    int lastWordStart   = ((lastBitIndex-1)>>3);
-
-    long startmask = -1L << startBitIndex;
-    long endmask = -1L >>> -lastBitIndex;  // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
-
-    // invert masks since we are clearing
-    startmask = ~startmask;
-    endmask = ~endmask;
-
-    if (firstWordStart == lastWordStart) {
-      data.setLong(firstWordStart,  data.getLong(firstWordStart) & (startmask | endmask));
-      return;
-    }
-    data.setLong(firstWordStart,  data.getLong(firstWordStart) & startmask);
-
-    int middle = Math.min(this.longWords, lastWordStart);
-    
-    for(int i =firstWordStart+8; i < middle; i += 8){
-      data.setLong(i, 0L);
-    }
-    if (lastWordStart < this.longWords) {
-      data.setLong(lastWordStart,  data.getLong(lastWordStart) & endmask);
-    }
-  }
-  
-  public void setAllFalse(){
-    clear(0, valueCount);
-  }
-
-  
-  public void clear(int startIndex, int endIndex) {
-    if (endIndex <= startIndex) return;
-
-    int startWord = (startIndex >> 6);
-    if (startWord >= longWords) return;
-
-    // since endIndex is one past the end, this is index of the last
-    // word to be changed.
-    int endWord = ((endIndex - 1) >> 6);
-
-    long startmask = -1L << startIndex;
-    long endmask = -1L >>> -endIndex; // 64-(endIndex&0x3f) is the same as -endIndex due to wrap
-
-    // invert masks since we are clearing
-    startmask = ~startmask;
-    endmask = ~endmask;
-    
-    int startWordPos = startWord * 8;
-    if (startWord == endWord) {
-      data.setLong(startWordPos, data.getLong(startWordPos) & (startmask | endmask));
-      return;
-    }
-
-    int endWordPos = endWord * 8;
-
-    data.setLong(startWordPos, data.getLong(startWordPos) & startmask);
-
-    int middle = Math.min(longWords, endWord)*8;
-    
-    
-    for(int i =startWordPos+8; i < middle; i += 8){
-      data.setLong(i, 0L);
-    }
-    
-    if (endWordPos < startWordPos) {
-      data.setLong(endWordPos, data.getLong(endWordPos) & endmask);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java
deleted file mode 100644
index d8e1c80..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ByteVector.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-
-public class ByteVector extends AbstractFixedValueVector<ByteVector>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ByteVector.class);
-
-  private final MaterializedField field;
-
-  public ByteVector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator, 8);
-    this.field = new MaterializedField(fieldId, DataType.SIGNED_BYTE, false, ValueMode.VECTOR, this.getClass());
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
-  }
-
-  public void setByte(int index, byte b){
-    data.setByte(index, b);
-  }
-
-  public byte getByte(int index){
-    return data.getByte(index);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java
new file mode 100644
index 0000000..82c86d1
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed1.java
@@ -0,0 +1,43 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed1 extends AbstractFixedValueVector<Fixed1>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed1.class);
+
+  public Fixed1(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 8);
+  }
+  
+  public void setByte(int index, byte b){
+    data.setByte(index, b);
+  }
+
+  public byte getByte(int index){
+    return data.getByte(index);
+  }
+  
+  @Override
+  public Object getObject(int index) {
+    return getByte(index);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java
new file mode 100644
index 0000000..c5f641a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed12.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed12 extends AbstractFixedValueVector<Fixed12>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed12.class);
+
+  public Fixed12(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 12*8);
+  }
+
+  
+  @Override
+  public Object getObject(int index) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java
new file mode 100644
index 0000000..649832b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed16.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed16 extends AbstractFixedValueVector<Fixed16>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed16.class);
+
+  public Fixed16(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 16*8);
+  }
+
+  @Override
+  public Object getObject(int index) {
+    return null;
+  }
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java
new file mode 100644
index 0000000..bd0e313
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed2.java
@@ -0,0 +1,53 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed2 extends AbstractFixedValueVector<Fixed2>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed2.class);
+
+  public Fixed2(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 2*8);
+  }
+
+  public final void setSmallInt(int index, short value){
+    index*=2;
+    data.setShort(index, value);
+  }
+  
+  public final short getSmallInt(int index){
+    index*=2;
+    return data.getShort(index);
+  }
+  
+  public final void setUInt2(int index, short value){
+    setSmallInt(index, value);
+  }
+  
+  public final short getUInt2(int index){
+    return getSmallInt(index);
+  }
+  
+  @Override
+  public Object getObject(int index) {
+    return getSmallInt(index);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java
new file mode 100644
index 0000000..650029d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed4.java
@@ -0,0 +1,55 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed4 extends AbstractFixedValueVector<Fixed4>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed4.class);
+
+  public Fixed4(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 4*8);
+  }
+
+  public final void setInt(int index, int value){
+    index*=4;
+    data.setInt(index, value);
+  }
+  
+  public final int getInt(int index){
+    index*=4;
+    return data.getInt(index);
+  }
+  
+  public final void setFloat4(int index, float value){
+    index*=8;
+    data.setFloat(index, value);
+  }
+  
+  public final float getFloat4(int index){
+    index*=8;
+    return data.getFloat(index);
+  }
+  
+  @Override
+  public Object getObject(int index) {
+    return getInt(index);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java
new file mode 100644
index 0000000..3629f5c
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Fixed8.java
@@ -0,0 +1,58 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class Fixed8 extends AbstractFixedValueVector<Fixed8>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Fixed8.class);
+
+  public Fixed8(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, 8*8);
+  }
+
+  public final void setBigInt(int index, long value){
+    index*=8;
+    data.setLong(index, value);
+  }
+  
+  public final long getBigInt(int index){
+    index*=8;
+    return data.getLong(index);
+  }
+  
+  public final void setFloat8(int index, double value){
+    index*=8;
+    data.setDouble(index, value);
+  }
+  
+  public final double getFloat8(int index){
+    index*=8;
+    return data.getDouble(index);
+  }
+
+  @Override
+  public Object getObject(int index) {
+    return getBigInt(index);
+  }
+  
+  
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java
new file mode 100644
index 0000000..594af23
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/FixedLen.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class FixedLen extends AbstractFixedValueVector<FixedLen>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FixedLen.class);
+
+  
+  public FixedLen(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, field.getWidth());
+  }
+
+  public void set(ByteBuf b){
+     
+  }
+  
+  public void get(ByteBuf b){
+    
+  }
+
+  @Override
+  public Object getObject(int index) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java
deleted file mode 100644
index 779b01b..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int16Vector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public final class Int16Vector extends AbstractFixedValueVector<Int16Vector>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Int16Vector.class);
-  
-  private final MaterializedField field;
-
-  public Int16Vector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator, 32);
-    this.field = new MaterializedField(fieldId, DataType.INT16, false, ValueMode.VECTOR, this.getClass());
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
-  }
-
-  public final void set(int index, short value){
-    index*=2;
-    data.setShort(index, value);
-  }
-  
-  public final short get(int index){
-    index*=2;
-    return data.getShort(index);
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java
deleted file mode 100644
index d142367..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/Int32Vector.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public final class Int32Vector extends AbstractFixedValueVector<Int32Vector>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Int32Vector.class);
-  
-  private final MaterializedField field;
-
-  public Int32Vector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator, 32);
-    this.field = new MaterializedField(fieldId, DataType.INT32, false, ValueMode.VECTOR, this.getClass());
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
-  }
-
-  public final void set(int index, int value){
-    index*=4;
-    data.setInt(index, value);
-  }
-  
-  public final int get(int index){
-    index*=4;
-    return data.getInt(index);
-  }
-  
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
new file mode 100644
index 0000000..cc18538
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableFixed4.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public final class NullableFixed4 extends NullableValueVector<NullableFixed4, Fixed4>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableFixed4.class);
+
+  public NullableFixed4(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator, NullableFixed4.class);
+  }
+
+  @Override
+  protected Fixed4 getNewValueVector(BufferAllocator allocator) {
+    return new Fixed4(null, allocator);
+  }
+
+
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java
deleted file mode 100644
index 372de13..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableInt32Vector.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public final class NullableInt32Vector extends NullableValueVector<NullableInt32Vector, Int32Vector>{
-
-  public NullableInt32Vector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator, NullableInt32Vector.class);
-  }
-
-
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableInt32Vector.class);
-  
-  
-  public int get(int index){
-    return this.value.get(index);
-  }
-  
-  public void set(int index, int value){
-    this.value.set(index, value);
-  }
-
-
-  @Override
-  protected Int32Vector getNewValueVector(int fieldId, BufferAllocator allocator) {
-    return new Int32Vector(fieldId, allocator);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
index 8e714ed..692ab87 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/NullableValueVector.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.record.vector;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.MaterializedField;
 
 /**
@@ -28,18 +29,16 @@ import org.apache.drill.exec.record.MaterializedField;
 abstract class NullableValueVector<T extends NullableValueVector<T, E>, E extends BaseValueVector<E>> extends BaseValueVector<T> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NullableValueVector.class);
 
-  protected BitVector bits;
+  protected Bit bits;
   protected E value;
-  private final MaterializedField field;
 
-  public NullableValueVector(int fieldId, BufferAllocator allocator, Class<T> valueClass) {
-    super(fieldId, allocator);
-    bits = new BitVector(fieldId, allocator);
-    value = getNewValueVector(fieldId, allocator);
-    this.field = value.getField().getNullableVersion(valueClass);
+  public NullableValueVector(MaterializedField field, BufferAllocator allocator, Class<T> valueClass) {
+    super(field, allocator);
+    bits = new Bit(null, allocator);
+    value = getNewValueVector(allocator);
   }
   
-  protected abstract E getNewValueVector(int fieldId, BufferAllocator allocator);
+  protected abstract E getNewValueVector(BufferAllocator allocator);
 
   public int isNull(int index){
     return bits.getBit(index);
@@ -76,5 +75,26 @@ abstract class NullableValueVector<T extends NullableValueVector<T, E>, E extend
   }
 
   
+  @Override
+  public ByteBuf[] getBuffers() {
+    return new ByteBuf[]{bits.data, value.data};
+  }
+
+  @Override
+  public void setRecordCount(int recordCount) {
+    super.setRecordCount(recordCount);
+    bits.setRecordCount(recordCount);
+    value.setRecordCount(recordCount);
+  }
+
+  @Override
+  public Object getObject(int index) {
+    if(isNull(index) == 0){
+      return null;
+    }else{
+      return value.getObject(index);
+    }
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java
new file mode 100644
index 0000000..2c08551
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/RepeatMap.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import io.netty.buffer.ByteBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class RepeatMap extends BaseValueVector<RepeatMap>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RepeatMap.class);
+
+  
+  public RepeatMap(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+  }
+
+  @Override
+  protected int getAllocationSize(int valueCount) {
+    return 4 * valueCount;
+  }
+
+  @Override
+  protected void childResetAllocation(int valueCount, ByteBuf buf) {
+  }
+
+  @Override
+  protected void childCloneMetadata(RepeatMap other) {
+  }
+
+  @Override
+  protected void childClear() {
+  }
+
+  @Override
+  public Object getObject(int index) {
+    return null;
+  }
+
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
index e9faa93..323b55f 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/SelectionVector.java
@@ -20,12 +20,16 @@ package org.apache.drill.exec.record.vector;
 import io.netty.buffer.ByteBufAllocator;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
 
-public class SelectionVector extends UInt16Vector{
+/**
+ * Convenience/Clarification Fixed2 wrapper.
+ */
+public class SelectionVector extends Fixed2{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SelectionVector.class);
 
-  public SelectionVector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator);
+  public SelectionVector(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java
new file mode 100644
index 0000000..8e89c41
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/TypeHelper.java
@@ -0,0 +1,250 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.SchemaDefProtos.DataMode;
+import org.apache.drill.exec.proto.SchemaDefProtos.MajorType;
+import org.apache.drill.exec.proto.SchemaDefProtos.MinorType;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class TypeHelper {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TypeHelper.class);
+  
+  private static final int WIDTH_ESTIMATE_1 = 10;
+  private static final int WIDTH_ESTIMATE_2 = 50000;
+  private static final int WIDTH_ESTIMATE_4 = 1024*1024;
+  
+  public static int getSize(MajorType major){
+    switch(major.getMinorType()){
+    case TINYINT: return 1;
+    case SMALLINT: return 2;
+    case INT: return 4;
+    case BIGINT: return 8;
+    case DECIMAL4: return 4;
+    case DECIMAL8: return 8;
+    case DECIMAL12: return 12;
+    case DECIMAL16: return 16;
+    case MONEY: return 8;
+    case DATE: return 4;
+    case TIME: return 8;
+    case TIMETZ: return 12;
+    case TIMESTAMP: return 8;
+    case DATETIME: return 8;
+    case INTERVAL: return 12;
+    case FLOAT4: return 4;
+    case FLOAT8: return 8;
+    case BOOLEAN: return 1/8;
+    case FIXEDCHAR: return major.getWidth();
+    case VARCHAR1: return 1 + WIDTH_ESTIMATE_1;
+    case VARCHAR2: return 2 + WIDTH_ESTIMATE_2;
+    case VARCHAR4: return 4 + WIDTH_ESTIMATE_4;
+    case FIXEDBINARY: return major.getWidth();
+    case VARBINARY1: return 1 + WIDTH_ESTIMATE_1;
+    case VARBINARY2: return 2 + WIDTH_ESTIMATE_2;
+    case VARBINARY4: return 4 + WIDTH_ESTIMATE_4;
+    case UINT1: return 1;
+    case UINT2: return 2;
+    case UINT4: return 4;
+    case UINT8: return 8;
+    case PROTO2: return 2 + WIDTH_ESTIMATE_2;
+    case PROTO4: return 4 + WIDTH_ESTIMATE_4;
+    case MSGPACK2: return 2 + WIDTH_ESTIMATE_2;
+    case MSGPACK4: return 4 + WIDTH_ESTIMATE_4;    
+    }
+    return 4;
+  }
+  
+  public static Class<?> getValueVectorClass(MinorType type, DataMode mode){
+    switch(mode){
+    case OPTIONAL:
+      switch(type){
+        case REPEATMAP: return RepeatMap.class;
+        case TINYINT: return Fixed1.class;
+        case SMALLINT: return Fixed2.class;
+        case INT: return Fixed4.class;
+        case BIGINT: return Fixed8.class;
+        case DECIMAL4: return Fixed4.class;
+        case DECIMAL8: return Fixed8.class;
+        case DECIMAL12: return Fixed12.class;
+        case DECIMAL16: return Fixed16.class;
+        case MONEY: return Fixed8.class;
+        case DATE: return Fixed4.class;
+        case TIME: return Fixed8.class;
+        case TIMETZ: return Fixed12.class;
+        case TIMESTAMP: return Fixed8.class;
+        case DATETIME: return Fixed8.class;
+        case INTERVAL: return Fixed12.class;
+        case FLOAT4: return Fixed4.class;
+        case FLOAT8: return Fixed8.class;
+        case BOOLEAN: return Bit.class;
+        case FIXEDCHAR: return FixedLen.class;
+        case VARCHAR1: return VarLen1.class;
+        case VARCHAR2: return VarLen2.class;
+        case VARCHAR4: return VarLen4.class;
+        case FIXEDBINARY: return FixedLen.class;
+        case VARBINARY1: return VarLen1.class;
+        case VARBINARY2: return VarLen2.class;
+        case VARBINARY4: return VarLen4.class;
+        case UINT1: return Fixed1.class;
+        case UINT2: return Fixed2.class;
+        case UINT4: return Fixed4.class;
+        case UINT8: return Fixed8.class;
+        case PROTO2: return VarLen2.class;
+        case PROTO4: return VarLen4.class;
+        case MSGPACK2: return VarLen2.class;
+        case MSGPACK4: return VarLen4.class;
+      }
+      break;
+    case REQUIRED:
+      switch(type){
+//        case TINYINT: return NullableFixed1.class;
+//        case SMALLINT: return NullableFixed2.class;
+//        case INT: return NullableFixed4.class;
+//        case BIGINT: return NullableFixed8.class;
+//        case DECIMAL4: return NullableFixed4.class;
+//        case DECIMAL8: return NullableFixed8.class;
+//        case DECIMAL12: return NullableFixed12.class;
+//        case DECIMAL16: return NullableFixed16.class;
+//        case MONEY: return NullableFixed8.class;
+//        case DATE: return NullableFixed4.class;
+//        case TIME: return NullableFixed8.class;
+//        case TIMETZ: return NullableFixed12.class;
+//        case TIMESTAMP: return NullableFixed8.class;
+//        case DATETIME: return NullableFixed8.class;
+//        case INTERVAL: return NullableFixed12.class;
+//        case FLOAT4: return NullableFixed4.class;
+//        case FLOAT8: return NullableFixed8.class;
+//        case BOOLEAN: return NullableBit.class;
+//        case FIXEDCHAR: return NullableFixedLen.class;
+//        case VARCHAR1: return NullableVarLen1.class;
+//        case VARCHAR2: return NullableVarLen2.class;
+//        case VARCHAR4: return NullableVarLen4.class;
+//        case FIXEDBINARY: return NullableFixedLen.class;
+//        case VARBINARY1: return NullableVarLen1.class;
+//        case VARBINARY2: return NullableVarLen2.class;
+//        case VARBINARY4: return NullableVarLen4.class;
+//        case UINT1: return NullableFixed1.class;
+//        case UINT2: return NullableFixed2.class;
+//        case UINT4: return NullableFixed4.class;
+//        case UINT8: return NullableFixed8.class;
+//        case PROTO2: return NullableVarLen2.class;
+//        case PROTO4: return NullableVarLen4.class;
+//        case MSGPACK2: return NullableVarLen2.class;
+//        case MSGPACK4: return NullableVarLen4.class;      
+      }
+      break;
+    case REPEATED:
+      switch(type){
+//        case TINYINT: return RepeatedFixed1.class;
+//        case SMALLINT: return RepeatedFixed2.class;
+//        case INT: return RepeatedFixed4.class;
+//        case BIGINT: return RepeatedFixed8.class;
+//        case DECIMAL4: return RepeatedFixed4.class;
+//        case DECIMAL8: return RepeatedFixed8.class;
+//        case DECIMAL12: return RepeatedFixed12.class;
+//        case DECIMAL16: return RepeatedFixed16.class;
+//        case MONEY: return RepeatedFixed8.class;
+//        case DATE: return RepeatedFixed4.class;
+//        case TIME: return RepeatedFixed8.class;
+//        case TIMETZ: return RepeatedFixed12.class;
+//        case TIMESTAMP: return RepeatedFixed8.class;
+//        case DATETIME: return RepeatedFixed8.class;
+//        case INTERVAL: return RepeatedFixed12.class;
+//        case FLOAT4: return RepeatedFixed4.class;
+//        case FLOAT8: return RepeatedFixed8.class;
+//        case BOOLEAN: return RepeatedBit.class;
+//        case FIXEDCHAR: return RepeatedFixedLen.class;
+//        case VARCHAR1: return RepeatedVarLen1.class;
+//        case VARCHAR2: return RepeatedVarLen2.class;
+//        case VARCHAR4: return RepeatedVarLen4.class;
+//        case FIXEDBINARY: return RepeatedFixedLen.class;
+//        case VARBINARY1: return RepeatedVarLen1.class;
+//        case VARBINARY2: return RepeatedVarLen2.class;
+//        case VARBINARY4: return RepeatedVarLen4.class;
+//        case UINT1: return RepeatedFixed1.class;
+//        case UINT2: return RepeatedFixed2.class;
+//        case UINT4: return RepeatedFixed4.class;
+//        case UINT8: return RepeatedFixed8.class;
+//        case PROTO2: return RepeatedVarLen2.class;
+//        case PROTO4: return RepeatedVarLen4.class;
+//        case MSGPACK2: return RepeatedVarLen2.class;
+//        case MSGPACK4: return RepeatedVarLen4.class;      
+      }
+      break;
+    default:
+      break;
+    
+    }
+    throw new UnsupportedOperationException();
+  }
+  
+  
+  public static ValueVector<?> getNewVector(MaterializedField field, BufferAllocator allocator){
+    MajorType type = field.getType();
+    switch(type.getMode()){
+    case REQUIRED:
+      switch(type.getMinorType()){
+      case TINYINT: return new Fixed1(field, allocator);
+      case SMALLINT: return new Fixed2(field, allocator);
+      case INT: return new Fixed4(field, allocator);
+      case BIGINT: return new Fixed8(field, allocator);
+      case DECIMAL4: return new Fixed4(field, allocator);
+      case DECIMAL8: return new Fixed8(field, allocator);
+      case DECIMAL12: return new Fixed12(field, allocator);
+      case DECIMAL16: return new Fixed16(field, allocator);
+      case MONEY: return new Fixed8(field, allocator);
+      case DATE: return new Fixed4(field, allocator);
+      case TIME: return new Fixed8(field, allocator);
+      case TIMETZ: return new Fixed12(field, allocator);
+      case TIMESTAMP: return new Fixed8(field, allocator);
+      case DATETIME: return new Fixed8(field, allocator);
+      case INTERVAL: return new Fixed12(field, allocator);
+      case FLOAT4: return new Fixed4(field, allocator);
+      case FLOAT8: return new Fixed8(field, allocator);
+      case BOOLEAN: return new Bit(field, allocator);
+      case FIXEDCHAR: return new FixedLen(field, allocator);
+      case VARCHAR1: return new VarLen1(field, allocator);
+      case VARCHAR2: return new VarLen2(field, allocator);
+      case VARCHAR4: return new VarLen4(field, allocator);
+      case FIXEDBINARY: return new FixedLen(field, allocator);
+      case VARBINARY1: return new VarLen1(field, allocator);
+      case VARBINARY2: return new VarLen2(field, allocator);
+      case VARBINARY4: return new VarLen4(field, allocator);
+      case UINT1: return new Fixed1(field, allocator);
+      case UINT2: return new Fixed2(field, allocator);
+      case UINT4: return new Fixed4(field, allocator);
+      case UINT8: return new Fixed8(field, allocator);
+      case PROTO2: return new VarLen2(field, allocator);
+      case PROTO4: return new VarLen4(field, allocator);
+      case MSGPACK2: return new VarLen2(field, allocator);
+      case MSGPACK4: return new VarLen4(field, allocator);      
+      }
+      break;
+    case REPEATED:
+      break;
+    case OPTIONAL:
+      break;
+    default:
+      break;
+    
+    }
+    throw new UnsupportedOperationException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java
deleted file mode 100644
index 87c306b..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/UInt16Vector.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.record.vector;
-
-import org.apache.drill.common.expression.types.DataType;
-import org.apache.drill.common.physical.RecordField.ValueMode;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-
-public class UInt16Vector extends AbstractFixedValueVector<Int32Vector>{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UInt16Vector.class);
-  
-  private final MaterializedField field;
-
-  public UInt16Vector(int fieldId, BufferAllocator allocator) {
-    super(fieldId, allocator, 16);
-    this.field = new MaterializedField(fieldId, DataType.UINT16, false, ValueMode.VECTOR, this.getClass());
-  }
-
-  @Override
-  public MaterializedField getField() {
-    return field;
-  }
-
-  public final void set(int index, char value){
-    index*=2;
-    data.setChar(index, value);
-  }
-  
-  public final char get(int index){
-    index*=2;
-    return data.getChar(index);
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
index 76b0e90..8a5a822 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/ValueVector.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.io.Closeable;
 
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.MaterializedField;
 
 /**
@@ -44,6 +45,13 @@ public interface ValueVector<T extends ValueVector<T>> extends Closeable {
   public abstract void allocateNew(int valueCount);
 
   /**
+   * Update the value vector to the provided record information.
+   * @param metadata
+   * @param data
+   */
+  public abstract void setTo(FieldMetadata metadata, ByteBuf data);
+  
+  /**
    * Zero copy move of data from this vector to the target vector. Any future access to this vector without being
    * populated by a new vector will cause problems.
    * 
@@ -52,19 +60,19 @@ public interface ValueVector<T extends ValueVector<T>> extends Closeable {
   public abstract void transferTo(T vector);
 
   /**
-   * Return the underlying buffer. Note that this doesn't impact the reference counts for this buffer so it only should be
+   * Return the underlying buffers associated with this vector. Note that this doesn't impact the reference counts for this buffer so it only should be
    * used for in context access. Also note that this buffer changes regularly thus external classes shouldn't hold a
-   * reference to it.
+   * reference to it (unless they change it).
    * 
    * @return The underlying ByteBuf.
    */
-  public abstract ByteBuf getBuffer();
+  public abstract ByteBuf[] getBuffers();
 
   /**
-   * Returns the number of value contained within this vector.
+   * Returns the maximum number of values contained within this vector.
    * @return Vector size
    */
-  public abstract int size();
+  public abstract int capacity();
 
 
   /**
@@ -79,4 +87,32 @@ public interface ValueVector<T extends ValueVector<T>> extends Closeable {
    */
   public abstract MaterializedField getField();
 
+  /**
+   * Define the number of records that are in this value vector.
+   * @param recordCount Number of records active in this vector.  Used for purposes such as getting a writable range of the data.
+   */
+  public abstract void setRecordCount(int recordCount);
+  public abstract int getRecordCount();
+  
+  
+  /**
+   * Get the metadata for this field.
+   * @return
+   */
+  public abstract FieldMetadata getMetadata();
+  
+  /**
+   * Debug interface to get values per record.
+   * @param index The record index.
+   * @return The value in the vector.
+   */
+  public Object getObject(int index);
+  
+  
+  /**
+   * Useful for generating random data.
+   */
+  public void randomizeData();
+    
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java
new file mode 100644
index 0000000..d87029d
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen1.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class VarLen1 extends VariableVector<VarLen1, Fixed1>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLen1.class);
+
+  public VarLen1(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+  }
+
+  @Override
+  protected Fixed1 getNewLengthVector(BufferAllocator allocator) {
+    return new Fixed1(null, allocator);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java
new file mode 100644
index 0000000..ebd440a
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen2.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class VarLen2 extends VariableVector<VarLen2, Fixed2>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLen2.class);
+
+  public VarLen2(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+  }
+
+  @Override
+  protected Fixed2 getNewLengthVector(BufferAllocator allocator) {
+    return new Fixed2(null, allocator);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java
new file mode 100644
index 0000000..b3cd712
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VarLen4.java
@@ -0,0 +1,36 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.record.vector;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.MaterializedField;
+
+public class VarLen4 extends VariableVector<VarLen4, Fixed4>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VarLen4.class);
+
+  public VarLen4(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+  }
+
+  @Override
+  protected Fixed4 getNewLengthVector(BufferAllocator allocator) {
+    return new Fixed4(null, allocator);
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
index dd84c94..4247f14 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/record/vector/VariableVector.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.record.DeadBuf;
+import org.apache.drill.exec.record.MaterializedField;
 
 /** 
  * A vector of variable length bytes.  Constructed as a vector of lengths or positions and a vector of values.  Random access is only possible if the variable vector stores positions as opposed to lengths.
@@ -29,18 +30,16 @@ public abstract class VariableVector<T extends VariableVector<T, E>, E extends B
 
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VariableVector.class);
   
-  protected E lengthVector;
+  protected final E lengthVector;
   private ByteBuf values = DeadBuf.DEAD_BUFFER;
   protected int expectedValueLength;
-  private final boolean hasPositions;
   
-  public VariableVector(int fieldId, BufferAllocator allocator, boolean hasPositions) {
-    super(fieldId, allocator);
-    this.lengthVector = getNewLengthVector(fieldId, allocator);
-    this.hasPositions = hasPositions;
+  public VariableVector(MaterializedField field, BufferAllocator allocator) {
+    super(field, allocator);
+    this.lengthVector = getNewLengthVector(allocator);
   }
   
-  protected abstract E getNewLengthVector(int fieldId, BufferAllocator allocator);
+  protected abstract E getNewLengthVector(BufferAllocator allocator);
   
   @Override
   protected int getAllocationSize(int valueCount) {
@@ -67,12 +66,28 @@ public abstract class VariableVector<T extends VariableVector<T, E>, E extends B
       values.release();
       values = DeadBuf.DEAD_BUFFER;
     }
-  }  
+  }
+
   
-  public boolean hasPositions(){
-    return hasPositions;
+  @Override
+  public ByteBuf[] getBuffers() {
+    return new ByteBuf[]{lengthVector.data, values};
   }
+
+  @Override
+  public void setRecordCount(int recordCount) {
+    super.setRecordCount(recordCount);
+    lengthVector.setRecordCount(recordCount);
+  }  
   
+  public void setTotalBytes(int totalBytes){
+    values.writerIndex(totalBytes);
+  }
+
+  @Override
+  public Object getObject(int index) {
+    return null;
+  }
   
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
new file mode 100644
index 0000000..859d385
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/AbstractHandshakeHandler.java
@@ -0,0 +1,57 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundMessageHandlerAdapter;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+
+public abstract class AbstractHandshakeHandler<T extends MessageLite> extends
+    ChannelInboundMessageHandlerAdapter<InboundRpcMessage> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractHandshakeHandler.class);
+
+  protected final EnumLite handshakeType;
+  protected final Parser<T> parser;
+  protected int coordinationId;
+
+  public AbstractHandshakeHandler(EnumLite handshakeType, Parser<T> parser) {
+    super();
+    this.handshakeType = handshakeType;
+    this.parser = parser;
+  }
+
+  @Override
+  public final void messageReceived(ChannelHandlerContext ctx, InboundRpcMessage inbound) throws Exception {
+    coordinationId = inbound.coordinationId;
+    ctx.channel().pipeline().remove(this);
+    if (inbound.rpcType != handshakeType.getNumber())
+      throw new RpcException(String.format("Handshake failure.  Expected %s[%d] but received number [%d]",
+          handshakeType, handshakeType.getNumber(), inbound.rpcType));
+  
+    T msg = parser.parseFrom(inbound.getProtobufBodyAsIS());
+    consumeHandshake(ctx.channel(), msg);
+    
+  }
+
+  protected abstract void consumeHandshake(Channel c, T msg) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java
new file mode 100644
index 0000000..a241880
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/Acks.java
@@ -0,0 +1,27 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+
+public class Acks {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Acks.class);
+  
+  public static final Ack OK = Ack.newBuilder().setOk(true).build();
+  public static final Ack FAIL = Ack.newBuilder().setOk(false).build();
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index c62d445..0ff2b9d 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -18,23 +18,34 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.concurrent.GenericFutureListener;
 
+import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
 
-public abstract class BasicClient<T extends EnumLite> extends RpcBus<T> {
+public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection> extends RpcBus<T, R> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
 
   private Bootstrap b;
   private volatile boolean connect = false;
+  protected R connection;
+  private EventLoopGroup eventLoop;
 
-  public BasicClient(ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+  public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+    super(rpcMapping);
+    this.eventLoop = eventLoopGroup;
+    
     b = new Bootstrap() //
         .group(eventLoopGroup) //
         .channel(NioSocketChannel.class) //
@@ -42,40 +53,132 @@ public abstract class BasicClient<T extends EnumLite> extends RpcBus<T> {
         .option(ChannelOption.SO_RCVBUF, 1 << 17) //
         .option(ChannelOption.SO_SNDBUF, 1 << 17) //
         .handler(new ChannelInitializer<SocketChannel>() {
-          
+
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
-            ch.closeFuture().addListener(getCloseHandler(ch));
-            
+            logger.debug("initializing client connection.");
+            connection = initRemoteConnection(ch);
+            ch.closeFuture().addListener(getCloseHandler(connection));
+
             ch.pipeline().addLast( //
                 new ZeroCopyProtobufLengthDecoder(), //
-                new RpcDecoder(), //
-                new RpcEncoder(), //
-                new InboundHandler(ch), //
+                new RpcDecoder(rpcConfig.getName()), //
+                new RpcEncoder(rpcConfig.getName()), //
+                getHandshakeHandler(), //
+                new InboundHandler(connection), //
                 new RpcExceptionHandler() //
                 );
-            channel = ch;
             connect = true;
           }
         }) //
-        
-        ;
+
+    ;
+  }
+
+  protected abstract ClientHandshakeHandler<?> getHandshakeHandler();
+
+  protected abstract class ClientHandshakeHandler<T extends MessageLite> extends AbstractHandshakeHandler<T> {
+    private Class<T> responseType;
+
+    public ClientHandshakeHandler(EnumLite handshakeType, Class<T> responseType, Parser<T> parser) {
+      super(handshakeType, parser);
+      this.responseType = responseType;
+    }
+
+    @Override
+    protected final void consumeHandshake(Channel c, T msg) throws Exception {
+      validateHandshake(msg);
+      queue.getFuture(handshakeType.getNumber(), coordinationId, responseType).setValue(msg);
+    }
+
+    protected abstract void validateHandshake(T msg) throws Exception;
+
+  }
+
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(Channel channel) {
+    return new ChannelClosedHandler();
+  }
+
+  protected final <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFutureImpl<RECEIVE> send(
+      T connection, T rpcType, SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+    throw new UnsupportedOperationException(
+        "This shouldn't be used in client mode as a client only has a single connection.");
+  }
+
+  protected <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(T rpcType,
+      SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) throws RpcException {
+    return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
   }
 
   @Override
   public boolean isClient() {
     return true;
   }
-  
-  public ChannelFuture connectAsClient(String host, int port) throws InterruptedException {
-    ChannelFuture f = b.connect(host, port).sync();
-    connect = !connect;
-    return f;
+
+  /**
+   * TODO: This is a horrible hack to manage deadlock caused by creation of BitClient within BitCom.  Should be cleaned up.
+   */
+  private class HandshakeThread<SEND extends MessageLite, RECEIVE extends MessageLite> extends Thread {
+    final SettableFuture<RECEIVE> future;
+    T handshakeType;
+    SEND handshakeValue;
+    String host;
+    int port;
+    Class<RECEIVE> responseClass;
+
+    public HandshakeThread(T handshakeType, SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) {
+      super();
+      assert host != null && !host.isEmpty();
+      assert port > 0;
+      logger.debug("Creating new handshake thread to connec to {}:{}", host, port);
+      this.setName(String.format("handshake thread for %s", handshakeType.getClass().getCanonicalName()));
+      future = SettableFuture.create();
+      this.handshakeType = handshakeType;
+      this.handshakeValue = handshakeValue;
+      this.host = host;
+      this.port = port;
+      this.responseClass = responseClass;
+    }
+
+    @Override
+    public void run() {
+      try {
+        logger.debug("Starting to get client connection on host {}, port {}.", host, port);
+        
+        ChannelFuture f = b.connect(host, port);
+        f.sync();
+        if (connection == null) throw new RpcException("Failure while attempting to connect to server.");
+        connect = !connect;
+        logger.debug("Client connected, sending handshake.");
+        DrillRpcFuture<RECEIVE> fut = send(handshakeType, handshakeValue, responseClass);
+        future.set(fut.checkedGet());
+        logger.debug("Got bit client connection.");
+      } catch (Exception e) {
+        logger.debug("Failed to get client connection.", e);
+        future.setException(e);
+      }
+    }
+
+  }
+
+  protected <SEND extends MessageLite, RECEIVE extends MessageLite> RECEIVE connectAsClient(T handshakeType,
+      SEND handshakeValue, String host, int port, Class<RECEIVE> responseClass) throws InterruptedException,
+      RpcException {
+    
+    
+    HandshakeThread<SEND, RECEIVE> ht = new HandshakeThread<SEND, RECEIVE>(handshakeType, handshakeValue, host, port, responseClass);
+    ht.start();
+    try{
+      return ht.future.get();  
+    }catch(Exception e){
+      throw new RpcException(e);
+    }
+    
   }
 
   public void close() {
     logger.debug("Closing client");
-    b.shutdown();
+    connection.getChannel().close();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e57a8d6d/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
new file mode 100644
index 0000000..0e62f14
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -0,0 +1,64 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.rpc;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.concurrent.GenericFutureListener;
+
+import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
+
+import com.google.protobuf.Internal.EnumLite;
+
+public abstract class BasicClientWithConnection<T extends EnumLite> extends BasicClient<T, ServerConnection>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClientWithConnection.class);
+
+  public BasicClientWithConnection(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
+    super(rpcMapping, alloc, eventLoopGroup);
+  }
+  
+  @Override
+  protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
+    return getCloseHandler(clientConnection.getChannel());
+  }
+  
+  @Override
+  protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+    return handle(rpcType, pBody, dBody);
+  }
+  
+  protected abstract Response handle(int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException ;
+    
+  @Override
+  public ServerConnection initRemoteConnection(Channel channel) {
+    return new ServerConnection(channel);
+  }
+
+  public static class ServerConnection extends RemoteConnection{
+
+    public ServerConnection(Channel channel) {
+      super(channel);
+    }
+
+  }
+
+  
+}