You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/04/23 05:14:47 UTC

[09/10] git commit: DRILL-442: Implement text format plugin

DRILL-442: Implement text format plugin

rename storage-engines.json storage-plugins.json
allow reading a particular value in a repeated vector
fix test caused by change that allows selecting element of repeated record
set def recordCount for explain query
fix bug loading repeated vectors
storage plugin/format plugin changes. store storage plugin configuration in distributed cache.
add repeated vector allocators
add support for for reading compressed files.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/54287d07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/54287d07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/54287d07

Branch: refs/heads/master
Commit: 54287d0761f97f337035aa8988faf380178aba08
Parents: 7b6c7a1
Author: Steven Phillips <sp...@maprtech.com>
Authored: Sun Mar 16 18:56:50 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Tue Apr 22 20:05:15 2014 -0700

----------------------------------------------------------------------
 .../drill/common/util/DataInputInputStream.java |  15 +-
 .../drill/exec/store/hbase/HBaseGroupScan.java  |   9 +-
 .../drill/exec/store/hbase/HBaseSubScan.java    |   2 +-
 ...base_scan_screen_physical_column_select.json |   2 +-
 distribution/src/assemble/bin.xml               |   2 +-
 distribution/src/resources/drill-override.conf  |  36 +++-
 distribution/src/resources/storage-engines.json |  26 ---
 distribution/src/resources/storage-plugins.json |  49 ++++++
 .../codegen/templates/RepeatedValueVectors.java |  60 ++++++-
 .../templates/VariableLengthVectors.java        |   4 +-
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../drill/exec/cache/DrillSerializable.java     |   8 +-
 .../exec/cache/JacksonDrillSerializable.java    |  86 ++++++++++
 .../org/apache/drill/exec/cache/LocalCache.java |  18 +-
 .../cache/VectorAccessibleSerializable.java     |   6 +-
 .../drill/exec/expr/EvaluationVisitor.java      |   8 +-
 .../exec/expr/ExpressionTreeMaterializer.java   |  31 +++-
 .../exec/expr/ValueVectorReadExpression.java    |  28 ++-
 .../drill/exec/expr/fn/impl/Alternator.java     |   4 +
 .../apache/drill/exec/opt/BasicOptimizer.java   |   6 +-
 .../impl/project/ProjectRecordBatch.java        |   8 +-
 .../impl/svremover/RemovingRecordBatch.java     |   8 +-
 .../drill/exec/planner/PhysicalPlanReader.java  |   4 +-
 .../drill/exec/planner/logical/DrillOptiq.java  |   2 +-
 .../exec/planner/logical/StorageEngines.java    |  74 --------
 .../exec/planner/logical/StoragePlugins.java    |  93 ++++++++++
 .../org/apache/drill/exec/server/Drillbit.java  |   1 +
 .../drill/exec/store/StoragePluginRegistry.java | 119 ++++++++-----
 .../exec/store/dfs/BasicFormatMatcher.java      |  44 ++++-
 .../drill/exec/store/dfs/FileSystemConfig.java  |  13 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |   2 +
 .../drill/exec/store/dfs/FormatCreator.java     |   5 +-
 .../exec/store/dfs/NamedFormatPluginConfig.java |   3 +-
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |  24 ++-
 .../exec/store/dfs/easy/EasyGroupScan.java      |  36 ++--
 .../drill/exec/store/dfs/easy/EasySubScan.java  |  12 +-
 .../exec/store/easy/json/JSONFormatPlugin.java  |   4 +-
 .../exec/store/easy/text/TextFormatPlugin.java  |  98 +++++++++++
 .../apache/drill/exec/store/hive/HiveScan.java  |  10 +-
 .../exec/store/parquet/ParquetFormatConfig.java |   1 +
 .../exec/store/parquet/ParquetGroupScan.java    |   2 +-
 .../exec/store/schedule/AssignmentCreator.java  |   2 +
 .../exec/store/schedule/BlockMapBuilder.java    |  10 +-
 .../exec/store/schedule/CompleteFileWork.java   |   5 +
 .../exec/store/text/DrillTextRecordReader.java  | 169 +++++++++++++++++++
 .../org/apache/drill/exec/util/VectorUtil.java  |   5 +-
 .../vector/RepeatedVariableWidthVector.java     |   8 +-
 .../RepeatedVariableEstimatedAllocator.java     |  36 ++++
 .../allocator/RepeatedVectorAllocator.java      |  36 ++++
 .../exec/vector/allocator/VectorAllocator.java  |   7 +-
 .../src/main/resources/drill-module.conf        |   8 +-
 .../java/org/apache/drill/PlanningBase.java     |  10 ++
 .../exec/physical/impl/join/TestMergeJoin.java  |   3 +
 .../record/ExpressionTreeMaterializerTest.java  |   9 +-
 .../drill/exec/store/TestOrphanSchema.java      |   9 +-
 .../drill/exec/store/ischema/OrphanSchema.java  |  11 +-
 .../exec/store/ischema/TestOrphanSchema.java    |   8 +-
 .../exec/store/text/TextRecordReaderTest.java   |  88 ++++++++++
 .../src/test/resources/storage-plugins.json     |  40 +++++
 .../src/test/resources/store/text/regions.csv   |   5 +
 .../src/test/resources/store/text/test.json     |  40 +++++
 pom.xml                                         |   1 +
 .../src/test/resources/storage-engines.json     |  27 ---
 .../src/test/resources/storage-plugins.json     |  47 ++++++
 64 files changed, 1280 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java b/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
index c4c2282..f61b301 100644
--- a/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
+++ b/common/src/main/java/org/apache/drill/common/util/DataInputInputStream.java
@@ -17,6 +17,9 @@
  */
 package org.apache.drill.common.util;
 
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+
 import java.io.*;
 
 
@@ -62,7 +65,17 @@ public class DataInputInputStream extends InputStream {
 
   @Override
   public int read(byte[] b, int off, int len) throws IOException {
-    in.readFully(b, off, len);
+    for (int i = off; i < off + len; i++) {
+      try {
+        b[i] = in.readByte();
+      } catch(Exception e) {
+        if (ExceptionUtils.getRootCause(e) instanceof EOFException) {
+          return i - off;
+        } else {
+          throw e;
+        }
+      }
+    }
     return len;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index bb0adcc..b8b6af4 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -76,14 +76,13 @@ public class HBaseGroupScan extends AbstractGroupScan {
 
   @JsonCreator
   public HBaseGroupScan(@JsonProperty("entries") List<HTableReadEntry> entries,
-                          @JsonProperty("storage") HBaseStoragePluginConfig storageEngineConfig,
+                          @JsonProperty("storage") HBaseStoragePluginConfig storagePluginConfig,
                           @JsonProperty("columns") List<SchemaPath> columns,
-                          @JacksonInject StoragePluginRegistry engineRegistry
+                          @JacksonInject StoragePluginRegistry pluginRegistry
                            )throws IOException, ExecutionSetupException {
     Preconditions.checkArgument(entries.size() == 1);
-    engineRegistry.init(DrillConfig.create());
-    this.storagePlugin = (HBaseStoragePlugin) engineRegistry.getEngine(storageEngineConfig);
-    this.storagePluginConfig = storageEngineConfig;
+    this.storagePlugin = (HBaseStoragePlugin) pluginRegistry.getPlugin(storagePluginConfig);
+    this.storagePluginConfig = storagePluginConfig;
     this.tableName = entries.get(0).getTableName();
     this.columns = columns;
     getRegionInfos();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
index 0e8a934..81a8af5 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSubScan.java
@@ -57,7 +57,7 @@ public class HBaseSubScan extends AbstractBase implements SubScan {
   public HBaseSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("storage") StoragePluginConfig storage,
                       @JsonProperty("rowGroupReadEntries") LinkedList<HBaseSubScanReadEntry> rowGroupReadEntries,
                       @JsonProperty("columns") List<SchemaPath> columns) throws ExecutionSetupException {
-    hbaseStoragePlugin = (HBaseStoragePlugin) registry.getEngine(storage);
+    hbaseStoragePlugin = (HBaseStoragePlugin) registry.getPlugin(storage);
     this.rowGroupReadEntries = rowGroupReadEntries;
     this.storage = storage;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
index c3d356d..7940c65 100644
--- a/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
+++ b/contrib/storage-hbase/src/test/resources/hbase/hbase_scan_screen_physical_column_select.json
@@ -20,7 +20,7 @@
       "zookeeperPort" : 2181
     },
     columns: [
-      "f2.c1", "f2.c2", "row_key"
+      "`f2`.c1", "`f2`.c2", "row_key"
     ]
   },
   {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 1579832..b981355 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -137,7 +137,7 @@
       <outputDirectory>conf</outputDirectory>
     </file>
     <file>
-      <source>src/resources/storage-engines.json</source>
+      <source>src/resources/storage-plugins.json</source>
       <outputDirectory>conf</outputDirectory>
     </file>
   </files>   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index e837e74..a5e5522 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -51,9 +51,15 @@ drill.exec: {
   },
   functions: ["org.apache.drill.expr.fn.impl"],
   storage: {
-    packages += "org.apache.drill.exec.store"  
+    packages += "org.apache.drill.exec.store",
+    file: {
+      text: {
+        buffer.size: 262144,
+        batch.size: 4000
+      }
+    }
   },
-  metrics : { 
+  metrics : {
     context: "drillbit",
     jmx: {
       enabled : true
@@ -71,7 +77,7 @@ drill.exec: {
   	retry: {
   	  count: 7200,
   	  delay: 500
-  	}    
+  	}
   },
   functions: ["org.apache.drill.expr.fn.impl"],
   network: {
@@ -90,10 +96,26 @@ drill.exec: {
     directories: ["/tmp/drill"],
     filesystem: "drill-local:///"
   },
-  spooling: {
+  buffer:{
     impl: "org.apache.drill.exec.work.batch.SpoolingRawBatchBuffer",
-    delete: false,
-    size: 100000000
+    size: "20000",
+    spooling: {
+      delete: false,
+      size: 100000000
+    }
   },
-  sort.purge.threshold : 100
+  cache.hazel.subnets: ["*.*.*.*"],
+  sort: {
+    purge.threshold : 100,
+    external: {
+      batch.size : 4000,
+      spill: {
+        batch.size : 4000,
+        group.size : 100,
+        threshold : 200,
+        directories : [ "/tmp/drill/spill" ],
+        fs : "file:///"
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/resources/storage-engines.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-engines.json b/distribution/src/resources/storage-engines.json
deleted file mode 100644
index 8b22858..0000000
--- a/distribution/src/resources/storage-engines.json
+++ /dev/null
@@ -1,26 +0,0 @@
-{
-  "storage":{
-    dfs: {
-      type: "file",
-      connection: "file:///"
-    },
-    cp: {
-      type: "file",
-      connection: "classpath:///"
-    } 
-
-    /*,
-    hive : {
-        type:"hive",
-        config :
-          {
-            "hive.metastore.uris" : "",
-            "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
-            "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
-            "fs.default.name" : "file:///",
-            "hive.metastore.sasl.enabled" : "false"
-          }
-      }
-      */
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
new file mode 100644
index 0000000..6f2c015
--- /dev/null
+++ b/distribution/src/resources/storage-plugins.json
@@ -0,0 +1,49 @@
+{
+  "storage":{
+    dfs: {
+      type: "file",
+      connection: "file:///",
+      formats: {
+        "psv" : {
+          type: "text",
+          extensions: [ "tbl" ],
+          delimiter: "|"
+        },
+        "csv" : {
+          type: "text",
+          extensions: [ "csv" ],
+          delimiter: ","
+        },
+        "tsv" : {
+          type: "text",
+          extensions: [ "tsv" ],
+          delimiter: "\t"
+        },
+        "parquet" : {
+          type: "parquet"
+        },
+        "json" : {
+          type: "json"
+        }
+      }
+    },
+    cp: {
+      type: "file",
+      connection: "classpath:///"
+    } 
+
+    /*,
+    hive : {
+        type:"hive",
+        config :
+          {
+            "hive.metastore.uris" : "",
+            "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
+            "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
+            "fs.default.name" : "file:///",
+            "hive.metastore.sasl.enabled" : "false"
+          }
+      }
+      */
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 8d5e90a..35bd480 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -108,15 +108,37 @@ package org.apache.drill.exec.vector;
       to.copyFrom(fromIndex, toIndex, Repeated${minor.class}Vector.this);
     }
   }
-  
-  public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
-    throw new UnsupportedOperationException();
-  }
-  
-  public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
-    throw new UnsupportedOperationException();
-  }
-  
+
+<#if type.major == "VarLen">
+    public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+      int count = v.getAccessor().getCount(inIndex);
+      getMutator().startNewGroup(outIndex);
+      for (int i = 0; i < count; i++) {
+        getMutator().add(outIndex, v.getAccessor().get(inIndex, i));
+      }
+    }
+
+    public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+      int count = v.getAccessor().getCount(inIndex);
+      getMutator().startNewGroup(outIndex);
+      for (int i = 0; i < count; i++) {
+        if (!getMutator().addSafe(outIndex, v.getAccessor().get(inIndex, i))) {
+          return false;
+        }
+      }
+      return true;
+    }
+<#else>
+
+    public void copyFrom(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean copyFromSafe(int inIndex, int outIndex, Repeated${minor.class}Vector v){
+        throw new UnsupportedOperationException();
+    }
+</#if>
+
   <#if type.major == "VarLen">
   @Override
   public FieldMetadata getMetadata() {
@@ -131,6 +153,7 @@ package org.apache.drill.exec.vector;
   
   public void allocateNew(int totalBytes, int parentValueCount, int childValueCount) {
     offsets.allocateNew(parentValueCount+1);
+    offsets.getMutator().set(0,0);
     values.allocateNew(totalBytes, childValueCount);
     mutator.reset();
     accessor.reset();
@@ -261,6 +284,12 @@ package org.apache.drill.exec.vector;
       holder.vector = values;
     }
 
+    public void get(int index, int positionIndex, ${minor.class}Holder holder) {
+      int offset = offsets.getAccessor().get(index);
+      assert offset >= 0;
+      values.getAccessor().get(offset + positionIndex, holder);
+    }
+
     public MaterializedField getField() {
       return field;
     }
@@ -301,6 +330,19 @@ package org.apache.drill.exec.vector;
       offsets.getMutator().set(index+1, nextOffset+1);
     }
 
+    <#if type.major == "VarLen">
+    public boolean addSafe(int index, byte[] bytes) {
+      return addSafe(index, bytes, 0, bytes.length);
+    }
+
+    public boolean addSafe(int index, byte[] bytes, int start, int length) {
+      int nextOffset = offsets.getAccessor().get(index+1);
+      boolean b1 = values.getMutator().setSafe(nextOffset, bytes, start, length);
+      boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1);
+      return (b1 && b2);
+    }
+    </#if>
+
     public void add(int index, ${minor.class}Holder holder){
       int nextOffset = offsets.getAccessor().get(index+1);
       values.getMutator().set(nextOffset, holder);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index dcc7135..9cec943 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -323,7 +323,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
 
       if (data.capacity() < currentOffset + length) return false;
 
-      offsetVector.getMutator().set(index + 1, currentOffset + length);
+      if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + length)) {
+        return false;
+      }
       data.setBytes(currentOffset, bytes, start, length);
       return true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index be59ea6..f88b1b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -58,5 +58,7 @@ public interface ExecConstants {
   public static final String EXTERNAL_SORT_SPILL_THRESHOLD = "drill.exec.sort.external.spill.threshold";
   public static final String EXTERNAL_SORT_SPILL_DIRS = "drill.exec.sort.external.spill.directories";
   public static final String EXTERNAL_SORT_SPILL_FILESYSTEM = "drill.exec.sort.external.spill.fs";
+  public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
+  public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
index 875e8b6..4f266f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -17,6 +17,10 @@
  */
 package org.apache.drill.exec.cache;
 
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+
 import java.io.*;
 
 /**
@@ -24,8 +28,8 @@ import java.io.*;
  */
 public interface DrillSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
-  public void read(DataInput input) throws IOException;
+  public void readData(ObjectDataInput input) throws IOException;
   public void readFromStream(InputStream input) throws IOException;
-  public void write(DataOutput output) throws IOException;
+  public void writeData(ObjectDataOutput output) throws IOException;
   public void writeToStream(OutputStream output) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
new file mode 100644
index 0000000..a7b0be2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+public abstract class JacksonDrillSerializable<T> implements DrillSerializable, DataSerializable{
+  private ObjectMapper mapper;
+  private T obj;
+
+  public JacksonDrillSerializable(DrillbitContext context, T obj) {
+    this.mapper = context.getConfig().getMapper();
+    this.obj = obj;
+  }
+
+  public JacksonDrillSerializable() {
+  }
+
+  @Override
+  public void readData(ObjectDataInput input) throws IOException {
+    readFromStream(DataInputInputStream.constructInputStream(input));
+  }
+
+  public void readFromStream(InputStream input, Class clazz) throws IOException {
+    mapper = DrillConfig.create().getMapper();
+    obj = (T) mapper.readValue(input, clazz);
+  }
+
+  @Override
+  public void writeData(ObjectDataOutput output) throws IOException {
+    writeToStream(DataOutputOutputStream.constructOutputStream(output));
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    output.write(mapper.writeValueAsBytes(obj));
+  }
+
+  public T getObj() {
+    return obj;
+  }
+
+  public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
+
+    public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
+      super(context, obj);
+    }
+
+    public StoragePluginsSerializable(BufferAllocator allocator) {
+    }
+
+    public StoragePluginsSerializable() {
+    }
+
+    @Override
+    public void readFromStream(InputStream input) throws IOException {
+      readFromStream(input, StoragePlugins.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 38de688..119764b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -17,7 +17,10 @@
  */
 package org.apache.drill.exec.cache;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.List;
@@ -27,6 +30,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
@@ -109,8 +114,14 @@ public class LocalCache implements DistributedCache {
 
   public static ByteArrayDataOutput serialize(DrillSerializable obj) {
     ByteArrayDataOutput out = ByteStreams.newDataOutput();
+    OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
     try {
-      obj.write(out);
+      obj.writeToStream(outputStream);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    try {
+      outputStream.flush();
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -119,9 +130,10 @@ public class LocalCache implements DistributedCache {
 
   public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
     ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
+    InputStream inputStream = DataInputInputStream.constructInputStream(in);
     try {
       V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
-      obj.read(in);
+      obj.readFromStream(inputStream);
       return obj;
     } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
       throw new RuntimeException(e);
@@ -164,6 +176,8 @@ public class LocalCache implements DistributedCache {
     @Override
     public V get(String key) {
       if (m.get(key) == null) return null;
+      ByteArrayDataOutput b = m.get(key);
+      byte[] bytes = b.toByteArray();
       return (V) deserialize(m.get(key).toByteArray(), this.clazz);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index ff7ab02..9511992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -21,6 +21,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
 import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
@@ -82,7 +84,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   }
 
   @Override
-  public void read(DataInput input) throws IOException {
+  public void readData(ObjectDataInput input) throws IOException {
     readFromStream(DataInputInputStream.constructInputStream(input));
   }
 
@@ -125,7 +127,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   }
 
   @Override
-  public void write(DataOutput output) throws IOException {
+  public void writeData(ObjectDataOutput output) throws IOException {
     writeToStream(DataOutputOutputStream.constructOutputStream(output));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index 2e632a3..fd547e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.Set;
 
 import io.netty.buffer.ByteBuf;
+import com.google.common.collect.Lists;
 import org.apache.drill.common.expression.CastExpression;
 import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.FunctionHolderExpression;
@@ -45,6 +46,7 @@ import org.apache.drill.common.expression.ValueExpressions.Decimal28Expression;
 import org.apache.drill.common.expression.ValueExpressions.Decimal38Expression;
 import org.apache.drill.common.expression.ValueExpressions.QuotedString;
 import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
@@ -332,7 +334,11 @@ public class EvaluationVisitor {
         }
       } else {
         if (Types.usesHolderForGet(e.getMajorType())) {
-          generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
+          if (e.isArrayElement()) {
+            generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(JExpr.lit(e.getIndex())).arg(out.getHolder()));
+          } else {
+            generator.getEvalBlock().add(getValueAccessor.arg(indexVariable).arg(out.getHolder()));
+          }
         } else {
           generator.getEvalBlock().assign(out.getValue(), getValueAccessor.arg(indexVariable));
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
index f9572db..e3b1002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ExpressionTreeMaterializer.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.expr;
 
 import java.util.List;
 
+import com.google.common.base.Joiner;
 import org.apache.drill.common.expression.CastExpression;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -26,6 +27,7 @@ import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.FunctionHolderExpression;
 import org.apache.drill.common.expression.IfExpression;
 import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.expression.TypedNullConstant;
 import org.apache.drill.common.expression.ValueExpressions;
@@ -228,12 +230,37 @@ public class ExpressionTreeMaterializer {
     @Override
     public LogicalExpression visitSchemaPath(SchemaPath path, FunctionImplementationRegistry value) {
 //      logger.debug("Visiting schema path {}", path);
-      TypedFieldId tfId = batch.getValueVectorId(path);
+      PathSegment seg = path.getRootSegment();
+      List<String> segments = Lists.newArrayList();
+      segments.add(seg.getNameSegment().getPath().toString());
+      boolean isArrayElement = false;
+      int index = -1;
+      while((seg = seg.getChild()) != null) {
+        if (seg.isNamed()) {
+          segments.add(seg.getNameSegment().getPath().toString());
+          if (seg.isLastPath()) {
+            break;
+          }
+        } else {
+          if (!seg.isLastPath()) {
+            throw new UnsupportedOperationException("Repeated map type not supported");
+          }
+          index = seg.getArraySegment().getIndex();
+          isArrayElement = true;
+          break;
+        }
+      }
+      SchemaPath newPath = SchemaPath.getCompoundPath((String[]) segments.toArray(new String[0]));
+      TypedFieldId tfId = batch.getValueVectorId(newPath);
       if (tfId == null) {
         logger.warn("Unable to find value vector of path {}, returning null instance.", path);
         return NullExpression.INSTANCE;
       } else {
-        return new ValueVectorReadExpression(tfId);
+        ValueVectorReadExpression e = new ValueVectorReadExpression(tfId, index, isArrayElement);
+        if (isArrayElement) {
+          e.required();
+        }
+        return e;
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
index 5e251a1..4ba503d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/ValueVectorReadExpression.java
@@ -22,23 +22,43 @@ import java.util.Iterator;
 import org.apache.drill.common.expression.ExpressionPosition;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.visitors.ExprVisitor;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.record.TypedFieldId;
 
 import com.google.common.collect.Iterators;
 
+import javax.sound.sampled.FloatControl;
+
 public class ValueVectorReadExpression implements LogicalExpression{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ValueVectorReadExpression.class);
 
-  private final MajorType type;
+  private MajorType type;
   private final TypedFieldId fieldId;
   private final boolean superReader;
+  private final int index;
+  private final boolean isArrayElement;
   
   
-  public ValueVectorReadExpression(TypedFieldId tfId){
+  public ValueVectorReadExpression(TypedFieldId tfId, int index, boolean isArrayElement){
     this.type = tfId.getType();
     this.fieldId = tfId;
     this.superReader = tfId.isHyperReader();
+    this.index = index;
+    this.isArrayElement = isArrayElement;
+  }
+
+  public void required() {
+    type = Types.required(type.getMinorType());
+  }
+
+  public boolean isArrayElement() {
+    return isArrayElement;
+  }
+
+  public ValueVectorReadExpression(TypedFieldId tfId) {
+    this(tfId, -1, false);
   }
   
   public TypedFieldId getTypedFieldId(){
@@ -62,6 +82,10 @@ public class ValueVectorReadExpression implements LogicalExpression{
     return fieldId;
   }
 
+  public int getIndex() {
+    return index;
+  }
+
   @Override
   public ExpressionPosition getPosition() {
     return ExpressionPosition.UNKNOWN;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
index bdd227c..641063b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/Alternator.java
@@ -17,12 +17,16 @@
  */
 package org.apache.drill.exec.expr.fn.impl;
 
+import org.apache.drill.exec.expr.DrillAggFunc;
 import org.apache.drill.exec.expr.DrillSimpleFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
 import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
 import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
 import org.apache.drill.exec.record.RecordBatch;
 
 public class Alternator {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 97ec026..d7d5ccb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -184,10 +184,10 @@ public class BasicOptimizer extends Optimizer{
     public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
       StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
       if(config == null) throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine()));
-      StoragePlugin engine;
+      StoragePlugin storagePlugin;
       try {
-        engine = context.getStorage().getEngine(config);
-        return engine.getPhysicalScan(scan.getSelection());
+        storagePlugin = context.getStorage().getPlugin(config);
+        return storagePlugin.getPhysicalScan(scan.getSelection());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index aaee8e7..e8ee3cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -30,6 +30,7 @@ import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
@@ -140,9 +141,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
 
 
         // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
-        if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE &&
-                !isAnyWildcard &&
-                !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())) {
+        if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+                && !isAnyWildcard
+                &&!transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldId())
+                && !((ValueVectorReadExpression) expr).isArrayElement()) {
           ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
           ValueVector vvIn = incoming.getValueAccessorById(vectorRead.getFieldId().getFieldId(), TypeHelper.getValueVectorClass(vectorRead.getMajorType().getMinorType(), vectorRead.getMajorType().getMode())).getValueVector();
           Preconditions.checkNotNull(incoming);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1377881..499f4d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -28,8 +28,14 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TransferPair;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.FixedWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index a100163..167a992 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -48,7 +48,7 @@ public class PhysicalPlanReader {
   private final ObjectReader logicalPlanReader;
 
   public PhysicalPlanReader(DrillConfig config, ObjectMapper mapper, final DrillbitEndpoint endpoint,
-                            final StoragePluginRegistry engineRegistry) {
+                            final StoragePluginRegistry pluginRegistry) {
 
     // Endpoint serializer/deserializer.
     SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
@@ -61,7 +61,7 @@ public class PhysicalPlanReader {
     mapper.registerModule(deserModule);
     mapper.registerSubtypes(PhysicalOperatorUtil.getSubTypes(config));
     InjectableValues injectables = new InjectableValues.Std() //
-            .addValue(StoragePluginRegistry.class, engineRegistry) //
+            .addValue(StoragePluginRegistry.class, pluginRegistry) //
         .addValue(DrillbitEndpoint.class, endpoint); //
 
     this.mapper = mapper;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
index bc2178b..46eed45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java
@@ -157,7 +157,7 @@ public class DrillOptiq {
         if (call.getOperator() == SqlStdOperatorTable.ITEM) {
           SchemaPath left = (SchemaPath) call.getOperands().get(0).accept(this);
           final RexLiteral literal = (RexLiteral) call.getOperands().get(1);
-          return left.getChild((String) literal.getValue2());
+          return left.getChild(literal.getValue2().toString());
         }
         
         // fall through

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
deleted file mode 100644
index d298040..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StorageEngines.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.planner.logical;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.logical.StoragePluginConfig;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Charsets;
-import com.google.common.io.Resources;
-
-public class StorageEngines implements Iterable<Map.Entry<String, StoragePluginConfig>>{
-  
-  private Map<String, StoragePluginConfig> storage;
-  
-  @JsonCreator
-  public StorageEngines(@JsonProperty("storage") Map<String, StoragePluginConfig> storage){
-    this.storage = storage;
-  }
-  
-  public static void main(String[] args) throws Exception{
-    DrillConfig config = DrillConfig.create();
-    String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
-    StorageEngines se = config.getMapper().readValue(data,  StorageEngines.class);
-    System.out.println(se);
-  }
-
-  @Override
-  public String toString() {
-    final int maxLen = 10;
-    return "StorageEngines [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]";
-  }
-
-  @Override
-  public Iterator<Entry<String, StoragePluginConfig>> iterator() {
-    return storage.entrySet().iterator();
-  }
-
-  private String toString(Collection<?> collection, int maxLen) {
-    StringBuilder builder = new StringBuilder();
-    builder.append("[");
-    int i = 0;
-    for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
-      if (i > 0)
-        builder.append(", ");
-      builder.append(iterator.next());
-    }
-    builder.append("]");
-    return builder.toString();
-  }
-  
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
new file mode 100644
index 0000000..939b77c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.logical.StoragePluginConfig;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+
+public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>>{
+  
+  private Map<String, StoragePluginConfig> storage;
+  
+  @JsonCreator
+  public StoragePlugins(@JsonProperty("storage") Map<String, StoragePluginConfig> storage){
+    this.storage = storage;
+  }
+  
+  public static void main(String[] args) throws Exception{
+    DrillConfig config = DrillConfig.create();
+    String data = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
+    StoragePlugins se = config.getMapper().readValue(data,  StoragePlugins.class);
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    config.getMapper().writeValue(System.out, se);
+    config.getMapper().writeValue(os, se);
+    se = config.getMapper().readValue(new ByteArrayInputStream(os.toByteArray()), StoragePlugins.class);
+    System.out.println(se);
+  }
+
+  @JsonProperty("storage")
+  public Map<String, StoragePluginConfig> getStorage() {
+    return storage;
+  }
+
+  @Override
+  public String toString() {
+    final int maxLen = 10;
+    return "StoragePlugins [storage=" + (storage != null ? toString(storage.entrySet(), maxLen) : null) + "]";
+  }
+
+  @Override
+  public Iterator<Entry<String, StoragePluginConfig>> iterator() {
+    return storage.entrySet().iterator();
+  }
+
+  private String toString(Collection<?> collection, int maxLen) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("[");
+    int i = 0;
+    for (Iterator<?> iterator = collection.iterator(); iterator.hasNext() && i < maxLen; i++) {
+      if (i > 0)
+        builder.append(", ");
+      builder.append(iterator.next());
+    }
+    builder.append("]");
+    return builder.toString();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof StoragePlugins)) {
+      return false;
+    }
+    return storage.equals(((StoragePlugins) obj).getStorage());
+  }
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 6400c75..411a76c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -95,6 +95,7 @@ public class Drillbit implements Closeable{
     DrillbitEndpoint md = engine.start();
     manager.start(md, cache, engine.getController(), engine.getDataConnectionCreator(), coord);
     cache.run();
+    manager.getContext().getStorage().init();
     handle = coord.register(md);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
index 2386915..7a88098 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java
@@ -20,12 +20,14 @@ package org.apache.drill.exec.store;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.net.URL;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import com.google.common.base.Preconditions;
 import net.hydromatic.linq4j.expressions.DefaultExpression;
 import net.hydromatic.linq4j.expressions.Expression;
 import net.hydromatic.optiq.SchemaPlus;
@@ -37,7 +39,10 @@ import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.util.PathScanner;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.planner.logical.StorageEngines;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.JacksonDrillSerializable.StoragePluginsSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.rpc.user.DrillUser;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.dfs.FileSystemPlugin;
@@ -53,8 +58,8 @@ import com.google.common.io.Resources;
 public class StoragePluginRegistry implements Iterable<Map.Entry<String, StoragePlugin>>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistry.class);
 
-  private Map<Object, Constructor<? extends StoragePlugin>> availableEngines = new HashMap<Object, Constructor<? extends StoragePlugin>>();
-  private final ImmutableMap<String, StoragePlugin> engines;
+  private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>();
+  private ImmutableMap<String, StoragePlugin> plugins;
 
   private DrillbitContext context;
   private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory();
@@ -64,67 +69,93 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
   public StoragePluginRegistry(DrillbitContext context) {
     try{
     this.context = context;
-    init(context.getConfig());
-    this.engines = ImmutableMap.copyOf(createEngines());
     }catch(RuntimeException e){
-      logger.error("Failure while loading storage engine registry.", e);
+      logger.error("Failure while loading storage plugin registry.", e);
       throw new RuntimeException("Faiure while reading and loading storage plugin configuration.", e);
     }
   }
 
   @SuppressWarnings("unchecked")
-  public void init(DrillConfig config){
-    Collection<Class<? extends StoragePlugin>> engines = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
-    logger.debug("Loading storage engines {}", engines);
-    for(Class<? extends StoragePlugin> engine: engines){
+  public void init() throws DrillbitStartupException {
+    DrillConfig config = context.getConfig();
+    Collection<Class<? extends StoragePlugin>> plugins = PathScanner.scanForImplementations(StoragePlugin.class, config.getStringList(ExecConstants.STORAGE_ENGINE_SCAN_PACKAGES));
+    logger.debug("Loading storage plugins {}", plugins);
+    for(Class<? extends StoragePlugin> plugin: plugins){
       int i =0;
-      for(Constructor<?> c : engine.getConstructors()){
+      for(Constructor<?> c : plugin.getConstructors()){
         Class<?>[] params = c.getParameterTypes();
         if(params.length != 3 || params[1] != DrillbitContext.class || !StoragePluginConfig.class.isAssignableFrom(params[0]) || params[2] != String.class){
-          logger.info("Skipping StorageEngine constructor {} for engine class {} since it doesn't implement a [constructor(StorageEngineConfig, DrillbitContext, String)]", c, engine);
+          logger.info("Skipping StoragePlugin constructor {} for plugin class {} since it doesn't implement a [constructor(StoragePluginConfig, DrillbitContext, String)]", c, plugin);
           continue;
         }
-        availableEngines.put(params[0], (Constructor<? extends StoragePlugin>) c);
+        availablePlugins.put(params[0], (Constructor<? extends StoragePlugin>) c);
         i++;
       }
       if(i == 0){
-        logger.debug("Skipping registration of StorageEngine {} as it doesn't have a constructor with the parameters of (StorangeEngineConfig, Config)", engine.getCanonicalName());
+        logger.debug("Skipping registration of StoragePlugin {} as it doesn't have a constructor with the parameters of (StorangePluginConfig, Config)", plugin.getCanonicalName());
       }
     }
 
+    this.plugins = ImmutableMap.copyOf(createPlugins());
 
   }
-
-  private Map<String, StoragePlugin> createEngines(){
-    StorageEngines engines = null;
-    Map<String, StoragePlugin> activeEngines = new HashMap<String, StoragePlugin>();
+  
+  private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException {
+    /*
+     * Check if "storage-plugins.json" exists. Also check if "storage-plugins" object exists in Distributed Cache.
+     * If both exist, check that they are the same. If they differ, throw exception. If "storage-plugins.json" exists, but
+     * nothing found in cache, then add it to the cache. If neither are found, throw exception.
+     */
+    StoragePlugins plugins = null;
+    StoragePlugins cachedPlugins = null;
+    Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>();
     try{
-      String enginesData = Resources.toString(Resources.getResource("storage-engines.json"), Charsets.UTF_8);
-      engines = context.getConfig().getMapper().readValue(enginesData, StorageEngines.class);
+      URL url = Resources.class.getClassLoader().getResource("storage-plugins.json");
+      if (url != null) {
+        String pluginsData = Resources.toString(url, Charsets.UTF_8);
+        plugins = context.getConfig().getMapper().readValue(pluginsData, StoragePlugins.class);
+      }
+      DistributedMap<StoragePluginsSerializable> map = context.getCache().getMap(StoragePluginsSerializable.class);
+      StoragePluginsSerializable cachedPluginsSerializable = map.get("storage-plugins");
+      if (cachedPluginsSerializable != null) {
+        cachedPlugins = cachedPluginsSerializable.getObj();
+        logger.debug("Found cached storage plugin config: {}", cachedPlugins);
+      } else {
+        Preconditions.checkNotNull(plugins,"No storage plugin configuration found");
+        logger.debug("caching storage plugin config {}", plugins);
+        map.put("storage-plugins", new StoragePluginsSerializable(context, plugins));
+        cachedPluginsSerializable = map.get("storage-plugins");
+        cachedPlugins = cachedPluginsSerializable.getObj();
+      }
+      if(!(plugins == null || cachedPlugins.equals(plugins))) {
+        logger.error("Storage plugin config mismatch. {}. {}", plugins, cachedPlugins);
+        throw new DrillbitStartupException("Storage plugin config mismatch");
+      }
+      logger.debug("using plugin config: {}", cachedPlugins);
     }catch(IOException e){
-      throw new IllegalStateException("Failure while reading storage engines data.", e);
+      throw new IllegalStateException("Failure while reading storage plugins data.", e);
     }
-
-    for(Map.Entry<String, StoragePluginConfig> config : engines){
+    
+    for(Map.Entry<String, StoragePluginConfig> config : cachedPlugins){
       try{
         StoragePlugin plugin = create(config.getKey(), config.getValue());
-        activeEngines.put(config.getKey(), plugin);
+        activePlugins.put(config.getKey(), plugin);
       }catch(ExecutionSetupException e){
         logger.error("Failure while setting up StoragePlugin with name: '{}'.", config.getKey(), e);
       }
     }
-    activeEngines.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
-
-    return activeEngines;
+    activePlugins.put("INFORMATION_SCHEMA", new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, "INFORMATION_SCHEMA"));
+    
+    return activePlugins;
   }
 
-  public StoragePlugin getEngine(String registeredStorageEngineName) throws ExecutionSetupException {
-    return engines.get(registeredStorageEngineName);
+  public StoragePlugin getPlugin(String registeredStoragePluginName) throws ExecutionSetupException {
+    return plugins.get(registeredStoragePluginName);
   }
-
-  public StoragePlugin getEngine(StoragePluginConfig config) throws ExecutionSetupException {
+  
+  public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException {
     if(config instanceof NamedStoragePluginConfig){
-      return engines.get(((NamedStoragePluginConfig) config).name);
+      return plugins.get(((NamedStoragePluginConfig) config).name);
     }else{
       // TODO: for now, we'll throw away transient configs.  we really ought to clean these up.
       return create(null, config);
@@ -132,33 +163,33 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
   }
 
   public FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException{
-    StoragePlugin p = getEngine(storageConfig);
-    if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a stroage engine that wasn't of type FileSystemPlugin.  The actual type of plugin was %s.", p.getClass().getName()));
+    StoragePlugin p = getPlugin(storageConfig);
+    if(!(p instanceof FileSystemPlugin)) throw new ExecutionSetupException(String.format("You tried to request a format plugin for a storage plugin that wasn't of type FileSystemPlugin.  The actual type of plugin was %s.", p.getClass().getName()));
     FileSystemPlugin storage = (FileSystemPlugin) p;
     return storage.getFormatPlugin(formatConfig);
   }
 
-  private StoragePlugin create(String name, StoragePluginConfig engineConfig) throws ExecutionSetupException {
-    StoragePlugin engine = null;
-    Constructor<? extends StoragePlugin> c = availableEngines.get(engineConfig.getClass());
+  private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException {
+    StoragePlugin plugin = null;
+    Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass());
     if (c == null)
-      throw new ExecutionSetupException(String.format("Failure finding StorageEngine constructor for config %s",
-          engineConfig));
+      throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s",
+          pluginConfig));
     try {
-      engine = c.newInstance(engineConfig, context, name);
-      return engine;
+      plugin = c.newInstance(pluginConfig, context, name);
+      return plugin;
     } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
       Throwable t = e instanceof InvocationTargetException ? ((InvocationTargetException) e).getTargetException() : e;
       if (t instanceof ExecutionSetupException)
         throw ((ExecutionSetupException) t);
       throw new ExecutionSetupException(String.format(
-          "Failure setting up new storage engine configuration for config %s", engineConfig), t);
+          "Failure setting up new storage plugin configuration for config %s", pluginConfig), t);
     }
   }
 
   @Override
   public Iterator<Entry<String, StoragePlugin>> iterator() {
-    return engines.entrySet().iterator();
+    return plugins.entrySet().iterator();
   }
 
   public DrillSchemaFactory getSchemaFactory(){
@@ -169,7 +200,7 @@ public class StoragePluginRegistry implements Iterable<Map.Entry<String, Storage
 
     @Override
     public void registerSchemas(DrillUser user, SchemaPlus parent) {
-      for(Map.Entry<String, StoragePlugin> e : engines.entrySet()){
+      for(Map.Entry<String, StoragePlugin> e : plugins.entrySet()){
         e.getValue().registerSchemas(user, parent);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
index 50678a6..232ec07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/BasicFormatMatcher.java
@@ -18,11 +18,13 @@
 package org.apache.drill.exec.store.dfs;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +33,8 @@ import com.beust.jcommander.internal.Lists;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Range;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 public class BasicFormatMatcher extends FormatMatcher{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicFormatMatcher.class);
@@ -39,6 +43,8 @@ public class BasicFormatMatcher extends FormatMatcher{
   private final MagicStringMatcher matcher;
   protected final DrillFileSystem fs;
   protected final FormatPlugin plugin;
+  protected final boolean compressible;
+  protected final CompressionCodecFactory codecFactory;
   
   public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<Pattern> patterns, List<MagicString> magicStrings) {
     super();
@@ -46,12 +52,21 @@ public class BasicFormatMatcher extends FormatMatcher{
     this.matcher = new MagicStringMatcher(magicStrings);
     this.fs = fs;
     this.plugin = plugin;
+    this.compressible = false;
+    this.codecFactory = null;
   }
   
-  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, String extension){
-    this(plugin, fs, //
-        Lists.newArrayList(Pattern.compile(".*\\." + extension)), //
-        (List<MagicString>) Collections.EMPTY_LIST);
+  public BasicFormatMatcher(FormatPlugin plugin, DrillFileSystem fs, List<String> extensions, boolean compressible){
+    List<Pattern> patterns = Lists.newArrayList();
+    for (String extension : extensions) {
+      patterns.add(Pattern.compile(".*\\." + extension));
+    }
+    this.patterns = patterns;
+    this.matcher = new MagicStringMatcher(new ArrayList<MagicString>());
+    this.fs = fs;
+    this.plugin = plugin;
+    this.compressible = compressible;
+    this.codecFactory = new CompressionCodecFactory(fs.getUnderlying().getConf());
   }
   
   @Override
@@ -62,14 +77,31 @@ public class BasicFormatMatcher extends FormatMatcher{
   @Override
   public FormatSelection isReadable(FileSelection selection) throws IOException {
     if(isReadable(selection.getFirstPath(fs))){
-      return new FormatSelection(plugin.getConfig(), selection);
+      if (plugin.getName() != null) {
+        NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
+        namedConfig.name = plugin.getName();
+        return new FormatSelection(namedConfig, selection);
+      } else {
+        return new FormatSelection(plugin.getConfig(), selection);
+      }
     }
     return null;
   }
 
   protected final boolean isReadable(FileStatus status) throws IOException {
+    CompressionCodec codec = null;
+    if (compressible) {
+      codec = codecFactory.getCodec(status.getPath());
+    }
+    String fileName;
+    if (codec != null) {
+      String path = status.getPath().toString();
+      fileName = path.substring(0, path.lastIndexOf('.'));
+    } else {
+      fileName = status.getPath().toString();
+    }
     for(Pattern p : patterns){
-      if(p.matcher(status.getPath().toString()).matches()){
+      if(p.matcher(fileName).matches()){
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
index 455c4b2..e392fa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemConfig.java
@@ -31,5 +31,16 @@ public class FileSystemConfig implements StoragePluginConfig{
   public String connection;
   public Map<String, String> workspaces;
   public Map<String, FormatPluginConfig> formats;
-  
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof FileSystemConfig)) {
+      return false;
+    }
+    FileSystemConfig that = (FileSystemConfig) obj;
+    boolean same = ((this.connection == null && that.connection == null) || this.connection.equals(that.connection)) &&
+            ((this.workspaces == null && that.workspaces == null) || this.workspaces.equals(that.workspaces)) &&
+            ((this.formats== null && that.formats == null) || this.formats.equals(that.formats));
+    return same;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index ebd8507..840a011 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.rpc.user.UserSession;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.ClassPathFileSystem;
+import org.apache.drill.exec.store.LocalSyncableFileSystem;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 import org.apache.drill.exec.store.dfs.shim.FileSystemCreator;
 import org.apache.hadoop.conf.Configuration;
@@ -69,6 +70,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
       Configuration fsConf = new Configuration();
       fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, config.connection);
       fsConf.set("fs.classpath.impl", ClassPathFileSystem.class.getName());
+      fsConf.set("fs.drill-local.impl", LocalSyncableFileSystem.class.getName());
       this.fs = FileSystemCreator.getFileSystem(context.getConfig(), fsConf);
       this.formatsByName = FormatCreator.getFormatPlugins(context, fs, config);
       List<FormatMatcher> matchers = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
index 7ce8c50..b40502f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatCreator.java
@@ -37,7 +37,7 @@ public class FormatCreator {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FormatCreator.class);
   
   
-  static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPlugin.class);
+  static final ConstructorChecker FORMAT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class, FormatPluginConfig.class);
   static final ConstructorChecker DEFAULT_BASED = new ConstructorChecker(String.class, DrillbitContext.class, DrillFileSystem.class, StoragePluginConfig.class);
   
   static Map<String, FormatPlugin> getFormatPlugins(DrillbitContext context, DrillFileSystem fileSystem, FileSystemConfig storageConfig){
@@ -69,7 +69,8 @@ public class FormatCreator {
         for(Constructor<?> c : pluginClass.getConstructors()){
           try{
             if(!FORMAT_BASED.check(c)) continue;
-            constructors.put(pluginClass, c);
+            Class<? extends FormatPluginConfig> configClass = (Class<? extends FormatPluginConfig>) c.getParameterTypes()[4];
+            constructors.put(configClass, c);
           }catch(Exception e){
             logger.warn(String.format("Failure while trying instantiate FormatPlugin %s.", pluginClass.getName()), e);
           }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
index 6b98ea2..173dfeb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/NamedFormatPluginConfig.java
@@ -17,12 +17,13 @@
  */
 package org.apache.drill.exec.store.dfs;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.drill.common.logical.FormatPluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 
 @JsonTypeName("named")
-public class NamedFormatPluginConfig implements FormatPluginConfig{
+public class NamedFormatPluginConfig implements FormatPluginConfig {
   public String name;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 780ec14..9c1dc74 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -40,6 +40,10 @@ import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
 
 import com.beust.jcommander.internal.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
@@ -51,20 +55,24 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
   private final boolean blockSplittable;
   private final DrillFileSystem fs;
   private final StoragePluginConfig storageConfig;
-  private final FormatPluginConfig formatConfig;
+  protected final FormatPluginConfig formatConfig;
   private final String name;
+  protected final CompressionCodecFactory codecFactory;
+  private final boolean compressible;
   
   protected EasyFormatPlugin(String name, DrillbitContext context, DrillFileSystem fs, StoragePluginConfig storageConfig,
-                             T formatConfig, boolean readable, boolean writable, boolean blockSplittable, String extension, String defaultName){
-    this.matcher = new BasicFormatMatcher(this, fs, extension);
+                             T formatConfig, boolean readable, boolean writable, boolean blockSplittable, boolean compressible, List<String> extensions, String defaultName){
+    this.matcher = new BasicFormatMatcher(this, fs, extensions, compressible);
     this.readable = readable;
     this.writable = writable;
     this.context = context;
     this.blockSplittable = blockSplittable;
+    this.compressible = compressible;
     this.fs = fs;
     this.storageConfig = storageConfig;
     this.formatConfig = formatConfig;
-    this.name = name == null ? defaultName : name; 
+    this.name = name == null ? defaultName : name;
+    this.codecFactory = new CompressionCodecFactory(new Configuration(fs.getUnderlying().getConf()));
   }
   
   @Override
@@ -88,9 +96,13 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
    * 
    * @return True if splittable.
    */
-  public boolean isBlockSplittable(){
+  public boolean isBlockSplittable() {
     return blockSplittable;
-  };
+  }
+
+  public boolean isCompressible() {
+    return compressible;
+  }
 
   public abstract RecordReader getRecordReader(FragmentContext context, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 6015865..fc2ae2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -32,8 +32,11 @@ import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.base.Size;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.StoragePluginRegistry;
 import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.BlockMapBuilder;
@@ -70,11 +73,19 @@ public class EasyGroupScan extends AbstractGroupScan{
       @JacksonInject StoragePluginRegistry engineRegistry, // 
       @JsonProperty("columns") List<SchemaPath> columns
       ) throws IOException, ExecutionSetupException {
-    
+
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
     this.selection = new FileSelection(files, true);
-    this.maxWidth = selection.getFileStatusList(formatPlugin.getFileSystem()).size();
+    try{
+      BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+      this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+      this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+    }catch(IOException e){
+      logger.warn("Failure determining endpoint affinity.", e);
+      this.endpointAffinities = Collections.emptyList();
+    }
+    maxWidth = chunks.size();
     this.columns = columns;
   }
   
@@ -84,9 +95,17 @@ public class EasyGroupScan extends AbstractGroupScan{
       List<SchemaPath> columns
       ) throws IOException{
     this.selection = selection;
-    this.maxWidth = selection.getFileStatusList(formatPlugin.getFileSystem()).size();
     this.formatPlugin = formatPlugin;
     this.columns = columns;
+    try{
+      BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+      this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+      this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+    }catch(IOException e){
+      logger.warn("Failure determining endpoint affinity.", e);
+      this.endpointAffinities = Collections.emptyList();
+    }
+    maxWidth = chunks.size();
   }
 
   @Override
@@ -127,15 +146,10 @@ public class EasyGroupScan extends AbstractGroupScan{
   
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
+    assert chunks != null && chunks.size() > 0;
     if (this.endpointAffinities == null) {
-      try{
-      BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
-      this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
-      this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
-      }catch(IOException e){
-        logger.warn("Failure determining endpoint affinity.", e);
-        this.endpointAffinities = Collections.emptyList();
-      }
+        logger.debug("chunks: {}", chunks.size());
+        this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
     }
     return this.endpointAffinities;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/54287d07/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 6631a6a..c01fb84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -25,8 +25,10 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.physical.base.AbstractSubScan;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
 import org.apache.drill.exec.store.schedule.CompleteFileWork.FileWorkImpl;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
@@ -52,7 +54,7 @@ public class EasySubScan extends AbstractSubScan{
       @JacksonInject StoragePluginRegistry engineRegistry, // 
       @JsonProperty("columns") List<SchemaPath> columns //
       ) throws IOException, ExecutionSetupException {
-    
+
     this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
     Preconditions.checkNotNull(this.formatPlugin);
     this.files = files;
@@ -82,7 +84,13 @@ public class EasySubScan extends AbstractSubScan{
 
   @JsonProperty("format")
   public FormatPluginConfig getFormatConfig(){
-    return formatPlugin.getConfig();
+    if (formatPlugin.getName() != null) {
+      NamedFormatPluginConfig namedConfig = new NamedFormatPluginConfig();
+      namedConfig.name = formatPlugin.getName();
+      return namedConfig;
+    } else {
+      return formatPlugin.getConfig();
+    }
   }
   
   @JsonProperty("columns")