You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/08/16 03:44:47 UTC

[09/27] Initial Parquet commit. Suports INT, LONG, FLOAT, DOUBLE, distributed scheduling.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
new file mode 100644
index 0000000..2ad7b44
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -0,0 +1,403 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.vector.TypeHelper;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class ParquetRecordReader implements RecordReader {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
+
+  // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
+  private static final int NUMBER_OF_VECTORS = 1;
+  private static final long DEFAULT_BATCH_LENGTH = 256 * 1024 * NUMBER_OF_VECTORS; // 256kb
+  private static final long DEFAULT_BATCH_LENGTH_IN_BITS = DEFAULT_BATCH_LENGTH * 8; // 256kb
+
+  // TODO - should probably find a smarter way to set this, currently 2 megabytes
+  private static final int VAR_LEN_FIELD_LENGTH = 1024 * 1024 * 2;
+  public static final int PARQUET_PAGE_MAX_SIZE = 1024 * 1024 * 5;
+  private static final String SEPERATOR = System.getProperty("file.separator");
+
+
+  // used for clearing the last n bits of a byte
+  public static final byte[] endBitMasks = {-2, -4, -8, -16, -32, -64, -128};
+  // used for clearing the first n bits of a byte
+  public static final byte[] startBitMasks = {127, 63, 31, 15, 7, 3, 1};
+
+  private int bitWidthAllFixedFields;
+  private boolean allFieldsFixedLength;
+  private int recordsPerBatch;
+  private ByteBuf bufferWithAllData;
+  long totalRecords;
+  long rowGroupOffset;
+
+  private List<ColumnReader> columnStatuses;
+  FileSystem fileSystem;
+  private BufferAllocator allocator;
+  private long batchSize;
+  Path hadoopPath;
+  private final VarLenBinaryReader varLengthReader;
+
+  public CodecFactoryExposer getCodecFactoryExposer() {
+    return codecFactoryExposer;
+  }
+
+  private final CodecFactoryExposer codecFactoryExposer;
+
+  int rowGroupIndex;
+
+  public ParquetRecordReader(FragmentContext fragmentContext,
+                             String path, int rowGroupIndex, FileSystem fs,
+                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer) throws ExecutionSetupException {
+    this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, path, rowGroupIndex, fs, codecFactoryExposer, footer);
+  }
+
+
+  public ParquetRecordReader(FragmentContext fragmentContext, long batchSize,
+                             String path, int rowGroupIndex, FileSystem fs,
+                             CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer) throws ExecutionSetupException {
+    this.allocator = fragmentContext.getAllocator();
+
+    hadoopPath = new Path(path);
+    fileSystem = fs;
+    this.codecFactoryExposer = codecFactoryExposer;
+    this.rowGroupIndex = rowGroupIndex;
+    this.batchSize = batchSize;
+
+    columnStatuses = new ArrayList<>();
+
+    totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+
+    List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
+    allFieldsFixedLength = true;
+    ColumnDescriptor column;
+    ColumnChunkMetaData columnChunkMetaData;
+
+    // loop to add up the length of the fixed width columns and build the schema
+    for (int i = 0; i < columns.size(); ++i) {
+      column = columns.get(i);
+
+      // sum the lengths of all of the fixed length fields
+      if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+        // There is not support for the fixed binary type yet in parquet, leaving a task here as a reminder
+        // TODO - implement this when the feature is added upstream
+//          if (column.getType() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY){
+//              byteWidthAllFixedFields += column.getType().getWidth()
+//          }
+//          else { } // the code below for the rest of the fixed length fields
+
+        bitWidthAllFixedFields += getTypeLengthInBits(column.getType());
+      } else {
+        allFieldsFixedLength = false;
+      }
+
+    }
+    rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+
+    if (allFieldsFixedLength) {
+      recordsPerBatch = (int) Math.min(batchSize / bitWidthAllFixedFields, footer.getBlocks().get(0).getColumns().get(0).getValueCount());
+    }
+    try {
+      ArrayList<VarLenBinaryReader.VarLengthColumn> varLengthColumns = new ArrayList<>();
+      // initialize all of the column read status objects
+      boolean fieldFixedLength = false;
+      MaterializedField field;
+      for (int i = 0; i < columns.size(); ++i) {
+        column = columns.get(i);
+        columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
+        field = MaterializedField.create(new SchemaPath(toFieldName(column.getPath()), ExpressionPosition.UNKNOWN),
+            toMajorType(column.getType(), getDataMode(column, footer.getFileMetaData().getSchema())));
+        fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
+        ValueVector v = TypeHelper.getNewVector(field, allocator);
+        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
+          createFixedColumnReader(fieldFixedLength, field, column, columnChunkMetaData, recordsPerBatch, v);
+        } else {
+          varLengthColumns.add(new VarLenBinaryReader.VarLengthColumn(this, -1, column, columnChunkMetaData, false, v));
+        }
+      }
+      varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
+    } catch (SchemaChangeException e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  public ByteBuf getBufferWithAllData() {
+    return bufferWithAllData;
+  }
+
+  public int getRowGroupIndex() {
+    return rowGroupIndex;
+  }
+
+  public int getBitWidthAllFixedFields() {
+    return bitWidthAllFixedFields;
+  }
+
+  public long getBatchSize() {
+    return batchSize;
+  }
+
+  /**
+   * @param type a fixed length type from the parquet library enum
+   * @return the length in pageDataByteArray of the type
+   */
+  public static int getTypeLengthInBits(PrimitiveType.PrimitiveTypeName type) {
+    switch (type) {
+      case INT64:   return 64;
+      case INT32:   return 32;
+      case BOOLEAN: return 1;
+      case FLOAT:   return 32;
+      case DOUBLE:  return 64;
+      case INT96:   return 96;
+      // binary and fixed length byte array
+      default:
+        throw new IllegalStateException("Length cannot be determined for type " + type);
+    }
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+    long tA = System.nanoTime(), tB;
+    System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ParquetRecordReader.setup");
+    output.removeAllFields();
+
+    try {
+      for (ColumnReader crs : columnStatuses) {
+        output.addField(crs.valueVecHolder.getValueVector());
+      }
+      for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) {
+        output.addField(r.valueVecHolder.getValueVector());
+        output.setNewSchema();
+      }
+    }catch(SchemaChangeException e) {
+      throw new ExecutionSetupException("Error setting up output mutator.", e);
+    }
+
+    // the method for reading into a ByteBuf from a stream copies all of the data into a giant buffer
+    // here we do the same thing in a loop to not initialize so much on heap
+
+    // TODO - this should be replaced by an enhancement in Hadoop 2.0 that will allow reading
+    // directly into a ByteBuf passed into the reading method
+    int totalByteLength = 0;
+    long start = 0;
+    if (rowGroupIndex == 0){
+      totalByteLength = 4;
+    }
+    else{
+      start = rowGroupOffset;
+    }
+    for (ColumnReader crs : columnStatuses){
+      totalByteLength += crs.columnChunkMetaData.getTotalSize();
+    }
+    for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+      totalByteLength += r.columnChunkMetaData.getTotalSize();
+    }
+    int bufferSize = 64*1024;
+    long totalBytesWritten = 0;
+    int validBytesInCurrentBuffer;
+    byte[] buffer = new byte[bufferSize];
+    try {
+      bufferWithAllData = allocator.buffer(totalByteLength);
+      FSDataInputStream inputStream = fileSystem.open(hadoopPath);
+      inputStream.seek(start);
+      while (totalBytesWritten < totalByteLength){
+        validBytesInCurrentBuffer = (int) Math.min(bufferSize, totalByteLength - totalBytesWritten);
+        inputStream.read(buffer, 0 , validBytesInCurrentBuffer);
+        bufferWithAllData.writeBytes(buffer, 0 , (int) validBytesInCurrentBuffer);
+        totalBytesWritten += validBytesInCurrentBuffer;
+      }
+
+    } catch (IOException e) {
+      throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + hadoopPath.getName());
+    }
+    System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
+  }
+
+  private static String toFieldName(String[] paths) {
+    return join(SEPERATOR, paths);
+  }
+
+  private TypeProtos.DataMode getDataMode(ColumnDescriptor column, MessageType schema) {
+    if (schema.getColumnDescription(column.getPath()).getMaxDefinitionLevel() == 0) {
+      return TypeProtos.DataMode.REQUIRED;
+    } else {
+      return TypeProtos.DataMode.OPTIONAL;
+    }
+  }
+
+  private void resetBatch() {
+    for (ColumnReader column : columnStatuses) {
+      column.valueVecHolder.reset();
+      column.valuesReadInCurrentPass = 0;
+    }
+    for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns){
+      r.valueVecHolder.reset();
+      r.valuesReadInCurrentPass = 0;
+    }
+  }
+
+  /**
+   * @param fixedLength
+   * @param field
+   * @param descriptor
+   * @param columnChunkMetaData
+   * @param allocateSize - the size of the vector to create
+   * @return
+   * @throws SchemaChangeException
+   */
+  private boolean createFixedColumnReader(boolean fixedLength, MaterializedField field, ColumnDescriptor descriptor,
+                                          ColumnChunkMetaData columnChunkMetaData, int allocateSize, ValueVector v)
+      throws SchemaChangeException {
+    TypeProtos.MajorType type = field.getType();
+    if (columnChunkMetaData.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN){
+      columnStatuses.add(new BitReader(this, allocateSize, descriptor, columnChunkMetaData,
+          fixedLength, v));
+    }
+    else{
+      columnStatuses.add(new FixedByteAlignedReader(this, allocateSize, descriptor, columnChunkMetaData,
+          fixedLength, v));
+    }
+    return true;
+  }
+
+ public void readAllFixedFields(long recordsToRead, ColumnReader firstColumnStatus) throws IOException {
+
+   for (ColumnReader crs : columnStatuses){
+     crs.readAllFixedFields(recordsToRead, firstColumnStatus);
+   }
+ }
+
+  @Override
+  public int next() {
+    resetBatch();
+    long recordsToRead = 0;
+    try {
+      ColumnReader firstColumnStatus = columnStatuses.iterator().next();
+      if (allFieldsFixedLength) {
+        recordsToRead = Math.min(recordsPerBatch, firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
+      } else {
+        // arbitrary
+        recordsToRead = 8000;
+
+        // going to incorporate looking at length of values and copying the data into a single loop, hopefully it won't
+        // get too complicated
+
+        //loop through variable length data to find the maximum records that will fit in this batch
+        // this will be a bit annoying if we want to loop though row groups, columns, pages and then individual variable
+        // length values...
+        // jacques believes that variable length fields will be encoded as |length|value|length|value|...
+        // cannot find more information on this right now, will keep looking
+      }
+
+      if (allFieldsFixedLength) {
+        readAllFixedFields(recordsToRead, firstColumnStatus);
+      } else { // variable length columns
+        long fixedRecordsToRead = varLengthReader.readFields(recordsToRead, firstColumnStatus);
+        readAllFixedFields(fixedRecordsToRead, firstColumnStatus);
+      }
+
+      return firstColumnStatus.valuesReadInCurrentPass;
+    } catch (IOException e) {
+      throw new DrillRuntimeException(e);
+    }
+  }
+
+  static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName,
+                                               TypeProtos.DataMode mode) {
+    return toMajorType(primitiveTypeName, 0, mode);
+  }
+
+  static TypeProtos.MajorType toMajorType(PrimitiveType.PrimitiveTypeName primitiveTypeName, int length,
+                                               TypeProtos.DataMode mode) {
+    switch (primitiveTypeName) {
+      case BINARY:
+        return Types.required(TypeProtos.MinorType.VARBINARY);
+      case INT64:
+        return Types.required(TypeProtos.MinorType.BIGINT);
+      case INT32:
+        return Types.required(TypeProtos.MinorType.INT);
+      case BOOLEAN:
+        return Types.required(TypeProtos.MinorType.BIT);
+      case FLOAT:
+        return Types.required(TypeProtos.MinorType.FLOAT4);
+      case DOUBLE:
+        return Types.required(TypeProtos.MinorType.FLOAT8);
+      // Both of these are not supported by the parquet library yet (7/3/13),
+      // but they are declared here for when they are implemented
+      case INT96:
+        return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY).setWidth(12)
+            .setMode(mode).build();
+      case FIXED_LEN_BYTE_ARRAY:
+        checkArgument(length > 0, "A length greater than zero must be provided for a FixedBinary type.");
+        return TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.FIXEDBINARY)
+            .setWidth(length).setMode(mode).build();
+      default:
+        throw new UnsupportedOperationException("Type not supported: " + primitiveTypeName);
+    }
+  }
+
+  static String join(String delimiter, String... str) {
+    StringBuilder builder = new StringBuilder();
+    int i = 0;
+    for (String s : str) {
+      builder.append(s);
+      if (i < str.length) {
+        builder.append(delimiter);
+      }
+      i++;
+    }
+    return builder.toString();
+  }
+
+  @Override
+  public void cleanup() {
+    columnStatuses.clear();
+    bufferWithAllData.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
new file mode 100644
index 0000000..1e5d203
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -0,0 +1,137 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterators;
+import org.apache.drill.common.logical.StorageEngineConfig;
+import org.apache.drill.exec.exception.SetupException;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.ReadEntryFromHDFS;
+import org.apache.drill.exec.physical.base.*;
+import org.apache.drill.exec.store.StorageEngineRegistry;
+
+import java.util.*;
+
+// Class containing information for reading a single parquet row group form HDFS
+@JsonTypeName("parquet-row-group-scan")
+public class ParquetRowGroupScan extends AbstractBase implements SubScan {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRowGroupScan.class);
+
+  public StorageEngineConfig engineConfig;
+  private ParquetStorageEngine parquetStorageEngine;
+  private List<RowGroupReadEntry> rowGroupReadEntries;
+
+  @JsonCreator
+  public ParquetRowGroupScan(@JacksonInject StorageEngineRegistry registry, @JsonProperty("engineConfig") StorageEngineConfig engineConfig,
+                             @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries) throws SetupException {
+    parquetStorageEngine = (ParquetStorageEngine) registry.getEngine(engineConfig);
+    this.rowGroupReadEntries = rowGroupReadEntries;
+  }
+
+  public ParquetRowGroupScan(ParquetStorageEngine engine, ParquetStorageEngineConfig config,
+                              List<RowGroupReadEntry> rowGroupReadEntries) throws SetupException {
+    parquetStorageEngine = engine;
+    engineConfig = config;
+    this.rowGroupReadEntries = rowGroupReadEntries;
+  }
+
+  public List<RowGroupReadEntry> getRowGroupReadEntries() {
+    return rowGroupReadEntries;
+  }
+
+  public StorageEngineConfig getEngineConfig() {
+    return engineConfig;
+  }
+
+  @Override
+  public OperatorCost getCost() {
+    return null;
+  }
+
+  @Override
+  public Size getSize() {
+    return null;
+  }
+
+  @Override
+  public boolean isExecutable() {
+    return false;
+  }
+
+  @JsonIgnore
+  public ParquetStorageEngine getStorageEngine(){
+    return parquetStorageEngine;
+  }
+
+  @Override
+  public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+    return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
+    Preconditions.checkArgument(children.isEmpty());
+    try {
+      return new ParquetRowGroupScan(parquetStorageEngine, (ParquetStorageEngineConfig) engineConfig, rowGroupReadEntries);
+    } catch (SetupException e) {
+      // TODO - handle this
+      e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+    }
+    return null;
+  }
+
+  @Override
+  public Iterator<PhysicalOperator> iterator() {
+    return Iterators.emptyIterator();
+  }
+
+  public static class RowGroupReadEntry extends ReadEntryFromHDFS {
+
+    private int rowGroupIndex;
+
+    @parquet.org.codehaus.jackson.annotate.JsonCreator
+    public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
+                             @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex) {
+      super(path, start, length);
+      this.rowGroupIndex = rowGroupIndex;
+    }
+
+    @Override
+    public OperatorCost getCost() {
+      return new OperatorCost(1, 2, 1, 1);
+    }
+
+    @Override
+    public Size getSize() {
+      // TODO - these values are wrong, I cannot know these until after I read a file
+      return new Size(10, 10);
+    }
+
+    @JsonIgnore
+    public RowGroupReadEntry getRowGroupReadEntry() {
+      return new RowGroupReadEntry(this.getPath(), this.getStart(), this.getLength(), this.rowGroupIndex);
+    }
+
+    public int getRowGroupIndex(){
+      return rowGroupIndex;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
new file mode 100644
index 0000000..bd63406
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.MockScanBatchCreator;
+
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
+    long tA = System.nanoTime(), tB;
+    System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ScanBatCreator.scanBatch");
+    Preconditions.checkArgument(children.isEmpty());
+    List<RecordReader> readers = Lists.newArrayList();
+    for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
+      /*
+      Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
+      TODO - to prevent reading the footer again in the parquet record reader (it is read earlier in the ParquetStorageEngine)
+      we should add more information to the RowGroupInfo that will be populated upon the first read to
+      provide the reader with all of th file meta-data it needs
+      These fields will be added to the constructor below
+      */
+      try {
+        readers.add(
+            new ParquetRecordReader(
+                context, e.getPath(), e.getRowGroupIndex(), rowGroupScan.getStorageEngine().getFileSystem(),
+                rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+                ParquetFileReader.readFooter( rowGroupScan.getStorageEngine().getFileSystem().getConf(), new Path(e.getPath()))
+            )
+        );
+      } catch (IOException e1) {
+        throw new ExecutionSetupException(e1);
+      }
+    }
+    System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9));
+    return new ScanBatch(context, readers.iterator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
new file mode 100644
index 0000000..f070f0f
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java
@@ -0,0 +1,116 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.ReadEntry;
+import org.apache.drill.exec.physical.ReadEntryWithPath;
+import org.apache.drill.exec.physical.config.MockStorageEngine;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStorageEngine;
+import org.apache.drill.exec.store.RecordReader;
+
+import com.google.common.collect.ListMultimap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+
+public class ParquetStorageEngine extends AbstractStorageEngine{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
+
+  private final DrillbitContext context;
+  private final ParquetStorageEngineConfig configuration;
+  private FileSystem fs;
+  private Configuration conf;
+  static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
+  private CodecFactoryExposer codecFactoryExposer;
+  final ParquetMetadata footer;
+  public static final String HADOOP_DEFAULT_NAME = "fs.default.name";
+
+
+  public ParquetStorageEngine(ParquetStorageEngineConfig configuration, DrillbitContext context){
+    this.context = context;
+    this.configuration = configuration;
+    this.footer = null;
+    try {
+      this.conf = new Configuration();
+      this.conf.set(HADOOP_DEFAULT_NAME, configuration.getDfsName());
+      this.fs = FileSystem.get(conf);
+      codecFactoryExposer = new CodecFactoryExposer(conf);
+    } catch (IOException ie) {
+      throw new RuntimeException("Error setting up filesystem");
+    }
+  }
+
+  public Configuration getHadoopConfig() {
+    return this.conf;
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public DrillbitContext getContext() {
+    return this.context;
+  }
+
+  @Override
+  public boolean supportsRead() {
+    return true;
+  }
+
+  @Override
+  public ParquetGroupScan getPhysicalScan(Scan scan) throws IOException {
+
+    ArrayList<ReadEntryWithPath> readEntries = scan.getSelection().getListWith(new ObjectMapper(),
+        new TypeReference<ArrayList<ReadEntryWithPath>>() {});
+
+    return new ParquetGroupScan(readEntries, this);
+  }
+
+  @Override
+  public ListMultimap<ReadEntry, DrillbitEndpoint> getReadLocations(Collection<ReadEntry> entries) {
+    return null;
+  }
+
+  @Override
+  public RecordReader getReader(FragmentContext context, ReadEntry readEntry) throws IOException {
+    return null;
+  }
+
+
+  public CodecFactoryExposer getCodecFactoryExposer() {
+    return codecFactoryExposer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
new file mode 100644
index 0000000..ad55f13
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngineConfig.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StorageEngineConfigBase;
+
+import java.util.HashMap;
+
+@JsonTypeName("parquet")
+public class ParquetStorageEngineConfig extends StorageEngineConfigBase {
+
+  public String getDfsName() {
+    return dfsName;
+  }
+
+  // information needed to identify an HDFS instance
+  private String dfsName;
+  private HashMap<String,String> map;
+
+  @JsonCreator
+  public ParquetStorageEngineConfig(@JsonProperty("dfsName") String dfsName) {
+    this.dfsName = dfsName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ParquetStorageEngineConfig that = (ParquetStorageEngineConfig) o;
+
+    if (dfsName != null ? !dfsName.equals(that.dfsName) : that.dfsName != null) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return dfsName != null ? dfsName.hashCode() : 0;
+  }
+  public void set(String key, String value) {
+    map.put(key, value);
+  }
+
+  public String get(String key) {
+    return map.get(key);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
new file mode 100644
index 0000000..08e1023
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/VarLenBinaryReader.java
@@ -0,0 +1,130 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store.parquet;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import parquet.bytes.BytesUtils;
+import parquet.column.ColumnDescriptor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+
+import java.io.IOException;
+import java.util.List;
+
+public class VarLenBinaryReader {
+
+  ParquetRecordReader parentReader;
+  final List<VarLengthColumn> columns;
+
+  public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn> columns){
+    this.parentReader = parentReader;
+    this.columns = columns;
+  }
+
+  public static class VarLengthColumn extends ColumnReader {
+
+    VarLengthColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v) {
+      super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v);
+    }
+
+    @Override
+    protected void readField(long recordsToRead, ColumnReader firstColumnStatus) {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  /**
+   * Reads as many variable length values as possible.
+   *
+   * @param recordsToReadInThisPass - the number of records recommended for reading form the reader
+   * @param firstColumnStatus - a reference to the first column status in the parquet file to grab metatdata from
+   * @return - the number of fixed length fields that will fit in the batch
+   * @throws IOException
+   */
+  public long readFields(long recordsToReadInThisPass, ColumnReader firstColumnStatus) throws IOException {
+
+    long recordsReadInCurrentPass = 0;
+    int lengthVarFieldsInCurrentRecord;
+    boolean rowGroupFinished = false;
+    byte[] bytes;
+    VarBinaryVector currVec;
+    // write the first 0 offset
+    for (ColumnReader columnReader : columns) {
+      if (columnReader.isFixedLength) {
+        continue;
+      }
+      currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector();
+      currVec.getAccessor().getOffsetVector().getData().writeInt(0);
+      columnReader.bytesReadInCurrentPass = 0;
+      columnReader.valuesReadInCurrentPass = 0;
+    }
+    do {
+      lengthVarFieldsInCurrentRecord = 0;
+      for (ColumnReader columnReader : columns) {
+        if (columnReader.isFixedLength) {
+          continue;
+        }
+        if (columnReader.pageReadStatus.currentPage == null
+            || columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
+          columnReader.totalValuesRead += columnReader.pageReadStatus.valuesRead;
+          if (!columnReader.pageReadStatus.next()) {
+            rowGroupFinished = true;
+            break;
+          }
+        }
+        bytes = columnReader.pageReadStatus.pageDataByteArray;
+
+        // re-purposing  this field here for length in BYTES to prevent repetitive multiplication/division
+        columnReader.dataTypeLengthInBits = BytesUtils.readIntLittleEndian(bytes,
+            (int) columnReader.pageReadStatus.readPosInBytes);
+        lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits;
+
+      }
+      // check that the next record will fit in the batch
+      if (rowGroupFinished || (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() + lengthVarFieldsInCurrentRecord * 8
+          > parentReader.getBatchSize()){
+        break;
+      }
+      else{
+        recordsReadInCurrentPass++;
+      }
+      for (ColumnReader columnReader : columns) {
+        if (columnReader.isFixedLength) {
+          continue;
+        }
+        bytes = columnReader.pageReadStatus.pageDataByteArray;
+        currVec = (VarBinaryVector) columnReader.valueVecHolder.getValueVector();
+        // again, I am re-purposing the unused field here, it is a length n BYTES, not bits
+        currVec.getAccessor().getOffsetVector().getData().writeInt((int) columnReader.bytesReadInCurrentPass  +
+            columnReader.dataTypeLengthInBits - 4 * (int) columnReader.valuesReadInCurrentPass);
+        currVec.getData().writeBytes(bytes, (int) columnReader.pageReadStatus.readPosInBytes + 4,
+            columnReader.dataTypeLengthInBits);
+        columnReader.pageReadStatus.readPosInBytes += columnReader.dataTypeLengthInBits + 4;
+        columnReader.bytesReadInCurrentPass += columnReader.dataTypeLengthInBits + 4;
+        columnReader.pageReadStatus.valuesRead++;
+        columnReader.valuesReadInCurrentPass++;
+        currVec.getMutator().setValueCount((int)recordsReadInCurrentPass);
+        // reached the end of a page
+        if ( columnReader.pageReadStatus.valuesRead == columnReader.pageReadStatus.currentPage.getValueCount()) {
+          columnReader.pageReadStatus.next();
+        }
+      }
+    } while (recordsReadInCurrentPass < recordsToReadInThisPass);
+    return recordsReadInCurrentPass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index 4bfab47..54a6cb8 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -7,7 +7,7 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.DeadBuf;
 import org.apache.drill.exec.record.MaterializedField;
 
-abstract class BaseDataValueVector extends BaseValueVector{
+public abstract class BaseDataValueVector extends BaseValueVector{
 
   protected ByteBuf data = DeadBuf.DEAD_BUFFER;
   protected int valueCount;
@@ -47,6 +47,10 @@ abstract class BaseDataValueVector extends BaseValueVector{
   public FieldMetadata getMetadata() {
     return null;
   }
+
+  public ByteBuf getData(){
+    return data;
+  }
   
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
index 9fd33b9..2c86406 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/AbstractFragmentRunnerListener.java
@@ -91,9 +91,7 @@ public abstract class AbstractFragmentRunnerListener implements FragmentRunnerLi
   }
   
   protected abstract void statusChange(FragmentHandle handle, FragmentStatus status);
-    
-  
-  
+
   @Override
   public final void fail(FragmentHandle handle, String message, Throwable excep) {
     FragmentStatus.Builder status = getBuilder(FragmentState.FAILED);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
new file mode 100644
index 0000000..93f1af7
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemoteFragmentRunnerListener.java
@@ -0,0 +1,45 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.work;
+
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
+import org.apache.drill.exec.rpc.bit.BitTunnel;
+
+/**
+ * Informs remote node as fragment changes state.
+ */
+public class RemoteFragmentRunnerListener extends AbstractFragmentRunnerListener{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFragmentRunnerListener.class);
+  
+  private final BitTunnel tunnel;
+
+  public RemoteFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
+    super(context);
+    this.tunnel = tunnel;
+  }
+  
+  
+  @Override
+  protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+    logger.debug("Sending remote failure.");
+    tunnel.sendFragmentStatus(status);
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
deleted file mode 100644
index ef7bcb1..0000000
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/RemotingFragmentRunnerListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*******************************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- ******************************************************************************/
-package org.apache.drill.exec.work;
-
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.Builder;
-import org.apache.drill.exec.proto.ExecProtos.FragmentStatus.FragmentState;
-import org.apache.drill.exec.rpc.bit.BitTunnel;
-import org.apache.drill.exec.work.foreman.ErrorHelper;
-
-/**
- * Informs remote node as fragment changes state.
- */
-public class RemotingFragmentRunnerListener extends AbstractFragmentRunnerListener{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemotingFragmentRunnerListener.class);
-  
-  private final BitTunnel tunnel;
-
-  public RemotingFragmentRunnerListener(FragmentContext context, BitTunnel tunnel) {
-    super(context);
-    this.tunnel = tunnel;
-  }
-  
-  
-  @Override
-  protected void statusChange(FragmentHandle handle, FragmentStatus status) {
-    logger.debug("Sending remote failure.");
-    tunnel.sendFragmentStatus(status);
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
index 2829dfd..b6e0159 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java
@@ -48,10 +48,10 @@ import org.apache.drill.exec.rpc.bit.BitConnection;
 import org.apache.drill.exec.rpc.bit.BitRpcConfig;
 import org.apache.drill.exec.rpc.bit.BitTunnel;
 import org.apache.drill.exec.work.FragmentRunner;
-import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.RemoteFragmentRunnerListener;
+import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.IncomingFragmentHandler;
-import org.apache.drill.exec.work.fragment.RemoteFragmentHandler;
 
 import com.google.common.collect.Maps;
 import com.google.protobuf.MessageLite;
@@ -116,7 +116,7 @@ public class BitComHandlerImpl implements BitComHandler {
     logger.debug("Received remote fragment start instruction", fragment);
     FragmentContext context = new FragmentContext(bee.getContext(), fragment.getHandle(), null, null,new FunctionImplementationRegistry(bee.getContext().getConfig()));
     BitTunnel tunnel = bee.getContext().getBitCom().getTunnel(fragment.getForeman());
-    RemotingFragmentRunnerListener listener = new RemotingFragmentRunnerListener(context, tunnel);
+    RemoteFragmentRunnerListener listener = new RemoteFragmentRunnerListener(context, tunnel);
     try{
       FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
       RootExec exec = ImplCreator.getExec(context, rootOperator);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 0a4614a..c9c23b5 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -166,7 +166,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
       LogicalPlan logicalPlan = context.getPlanReader().readLogicalPlan(json);
       logger.debug("Logical {}", logicalPlan.unparse(DrillConfig.create()));
       PhysicalPlan physicalPlan = convert(logicalPlan);
-      logger.debug("Physical {}", new ObjectMapper().writeValueAsString(physicalPlan));
+      //logger.debug("Physical {}", new ObjectMapper().writeValueAsString(physicalPlan));
       runPhysicalPlan(physicalPlan);
     } catch (IOException e) {
       fail("Failure while parsing logical plan.", e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
index e4d0cfc..d63b4f4 100644
--- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
+++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RemoteFragmentHandler.java
@@ -36,7 +36,7 @@ import org.apache.drill.exec.rpc.bit.BitTunnel;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.FragmentRunner;
 import org.apache.drill.exec.work.FragmentRunnerListener;
-import org.apache.drill.exec.work.RemotingFragmentRunnerListener;
+import org.apache.drill.exec.work.RemoteFragmentRunnerListener;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 /**
@@ -58,7 +58,7 @@ public class RemoteFragmentHandler implements IncomingFragmentHandler {
       this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
       this.buffers = new IncomingBuffers(root);
       this.context = new FragmentContext(context, fragment.getHandle(), null, buffers, new FunctionImplementationRegistry(context.getConfig()));
-      this.runnerListener = new RemotingFragmentRunnerListener(this.context, foremanTunnel);
+      this.runnerListener = new RemoteFragmentRunnerListener(this.context, foremanTunnel);
       this.reader = context.getPlanReader();
       
     }catch(IOException e){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
new file mode 100644
index 0000000..72322d5
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
@@ -0,0 +1,42 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package parquet.hadoop;
+
+import com.sun.corba.se.impl.interceptors.CodecFactoryImpl;
+import org.apache.hadoop.conf.Configuration;
+import parquet.bytes.BytesInput;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+
+public class CodecFactoryExposer{
+
+  private CodecFactory codecFactory;
+
+  public CodecFactoryExposer(Configuration config){
+    codecFactory = new CodecFactory(config);
+  }
+
+  public CodecFactory getCodecFactory() {
+    return codecFactory;
+  }
+
+  public BytesInput decompress(BytesInput bytes, int uncompressedSize, CompressionCodecName codecName) throws IOException {
+    return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index a543197..a590420 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -8,17 +8,18 @@ drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
   	user.port : 31010,
-  	bit.port : 31011
+  	bit.port : 31011,
+  	use.ip : false
   },
   optimizer: {
     implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
   },
   
   zk: {
-	connect: "localhost:2181",
+	connect: "10.10.30.52:5181",
 	root: "/drill",
 	refresh: 500,
-	timeout: 1000,
+	timeout: 5000,
   	retry: {
   	  count: 7200,
   	  delay: 500

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
index 1513c99..f6d83cc 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestDistributedFragmentRun.java
@@ -21,13 +21,17 @@ import static org.junit.Assert.*;
 
 import java.util.List;
 
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.LogicalPlan;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.pop.PopUnitTestBase;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.ValueVector;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
@@ -37,7 +41,6 @@ import com.google.common.io.Files;
 public class TestDistributedFragmentRun extends PopUnitTestBase{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestDistributedFragmentRun.class);
   
-  
   @Test 
   public void oneBitOneExchangeOneEntryRun() throws Exception{
     RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java
new file mode 100644
index 0000000..ea1404b
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/ByteArrayUtil.java
@@ -0,0 +1,181 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+//TODO - make sure we figure out the license on these
+public class ByteArrayUtil {
+
+  public static byte[] toByta(Object data) throws Exception {
+    if (data instanceof Integer) return toByta((int) data);
+    else if (data instanceof Double) return toByta((double) data);
+    else if (data instanceof Float) return toByta((float) data);
+    else if (data instanceof Boolean) return toByta((boolean) data);
+    else if (data instanceof Long) return toByta((long) data);
+    else throw new Exception("Cannot convert that type to a byte array.");
+  }
+
+  // found at http://www.daniweb.com/software-development/java/code/216874/primitive-types-as-byte-arrays
+  // I have modified them to switch the endianess of integers and longs
+  /* ========================= */
+  /* "primitive type --> byte[] data" Methods */
+  /* ========================= */
+  public static byte[] toByta(byte data) {
+    return new byte[]{data};
+  }
+
+  public static byte[] toByta(byte[] data) {
+    return data;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(short data) {
+    return new byte[]{
+        (byte) ((data >> 8) & 0xff),
+        (byte) ((data >> 0) & 0xff),
+    };
+  }
+
+  public static byte[] toByta(short[] data) {
+    if (data == null) return null;
+    // ----------
+    byte[] byts = new byte[data.length * 2];
+    for (int i = 0; i < data.length; i++)
+      System.arraycopy(toByta(data[i]), 0, byts, i * 2, 2);
+    return byts;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(char data) {
+    return new byte[]{
+        (byte) ((data >> 8) & 0xff),
+        (byte) ((data >> 0) & 0xff),
+    };
+  }
+
+  public static byte[] toByta(char[] data) {
+    if (data == null) return null;
+    // ----------
+    byte[] byts = new byte[data.length * 2];
+    for (int i = 0; i < data.length; i++)
+      System.arraycopy(toByta(data[i]), 0, byts, i * 2, 2);
+    return byts;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(int data) {
+    return new byte[]{
+        (byte) ((data >> 0) & 0xff),
+        (byte) ((data >> 8) & 0xff),
+        (byte) ((data >> 16) & 0xff),
+        (byte) ((data >> 24) & 0xff),
+    };
+  }
+
+  public static byte[] toByta(int[] data) {
+    if (data == null) return null;
+    // ----------
+    byte[] byts = new byte[data.length * 4];
+    for (int i = 0; i < data.length; i++)
+      System.arraycopy(toByta(data[i]), 0, byts, i * 4, 4);
+    return byts;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(long data) {
+    return new byte[]{
+        (byte) ((data >> 0) & 0xff),
+        (byte) ((data >> 8) & 0xff),
+        (byte) ((data >> 16) & 0xff),
+        (byte) ((data >> 24) & 0xff),
+        (byte) ((data >> 32) & 0xff),
+        (byte) ((data >> 40) & 0xff),
+        (byte) ((data >> 48) & 0xff),
+        (byte) ((data >> 56) & 0xff),
+    };
+  }
+
+  public static byte[] toByta(long[] data) {
+    if (data == null) return null;
+    // ----------
+    byte[] byts = new byte[data.length * 8];
+    for (int i = 0; i < data.length; i++)
+      System.arraycopy(toByta(data[i]), 0, byts, i * 8, 8);
+    return byts;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(float data) {
+    return toByta(Float.floatToRawIntBits(data));
+  }
+
+  public static byte[] toByta(float[] data) {
+    if (data == null) return null;
+    // ----------
+    byte[] byts = new byte[data.length * 4];
+    for (int i = 0; i < data.length; i++)
+      System.arraycopy(toByta(data[i]), 0, byts, i * 4, 4);
+    return byts;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(double data) {
+    return toByta(Double.doubleToRawLongBits(data));
+  }
+
+  public static byte[] toByta(double[] data) {
+    if (data == null) return null;
+    // ----------
+    byte[] byts = new byte[data.length * 8];
+    for (int i = 0; i < data.length; i++)
+      System.arraycopy(toByta(data[i]), 0, byts, i * 8, 8);
+    return byts;
+  }
+
+  /* ========================= */
+  public static byte[] toByta(boolean data) {
+    return new byte[]{(byte) (data ? 0x01 : 0x00)}; // bool -> {1 byte}
+  }
+
+  public static byte[] toByta(boolean[] data) {
+    // Advanced Technique: The byte array containts information
+    // about how many boolean values are involved, so the exact
+    // array is returned when later decoded.
+    // ----------
+    if (data == null) return null;
+    // ----------
+    int len = data.length;
+    byte[] lena = toByta(len); // int conversion; length array = lena
+    byte[] byts = new byte[lena.length + (len / 8) + (len % 8 != 0 ? 1 : 0)];
+    // (Above) length-array-length + sets-of-8-booleans +? byte-for-remainder
+    System.arraycopy(lena, 0, byts, 0, lena.length);
+    // ----------
+    // (Below) algorithm by Matthew Cudmore: boolean[] -> bits -> byte[]
+    for (int i = 0, j = lena.length, k = 7; i < data.length; i++) {
+      byts[j] |= (data[i] ? 1 : 0) << k--;
+      if (k < 0) {
+        j++;
+        k = 7;
+      }
+    }
+    // ----------
+    return byts;
+  }
+
+  // above utility methods found here:
+  // http://www.daniweb.com/software-development/java/code/216874/primitive-types-as-byte-arrays
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
index 6b353ae..c9d6967 100644
--- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/JSONRecordReaderTest.java
@@ -52,6 +52,11 @@ public class JSONRecordReaderTest {
     }
 
     @Override
+    public void removeAllFields() {
+      addFields.clear();
+    }
+
+    @Override
     public void setNewSchema() throws SchemaChangeException {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0a2f997f/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
----------------------------------------------------------------------
diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
new file mode 100644
index 0000000..f9a1ecb
--- /dev/null
+++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/MockScantTest.java
@@ -0,0 +1,115 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ ******************************************************************************/
+package org.apache.drill.exec.store;
+
+import com.beust.jcommander.internal.Lists;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.store.parquet.ParquetStorageEngine;
+import org.apache.drill.exec.vector.BaseDataValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+import parquet.bytes.BytesInput;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.Page;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.hadoop.Footer;
+import parquet.hadoop.ParquetFileWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.*;
+import static parquet.column.Encoding.PLAIN;
+
+
+public class MockScantTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StorageEngineRegistry.class);
+
+  private boolean VERBOSE_DEBUG = false;
+
+  private class ParquetResultListener implements UserResultsListener {
+
+    CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void submissionFailed(RpcException ex) {
+      latch.countDown();
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      if(result.getHeader().getIsLastChunk()) latch.countDown();
+      result.getData().release(1);
+    }
+
+    public void await() throws Exception {
+      latch.await();
+    }
+  }
+
+
+  @Test
+  public void testMockScanFullEngine() throws Exception{
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    DrillConfig config = DrillConfig.create();
+
+//    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){
+    try(DrillClient client = new DrillClient(config)){
+      long A = System.nanoTime();
+//      bit1.run();
+      long B = System.nanoTime();
+      client.connect();
+      long C = System.nanoTime();
+      ParquetResultListener listener = new ParquetResultListener();
+      client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile("/mock-scan.json"), Charsets.UTF_8), listener);
+      listener.await();
+      long D = System.nanoTime();
+      System.out.println(String.format("Took %f s to run query", (float)(D-C) / 1E9));
+    }
+  }
+}