You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/11 05:51:59 UTC

[14/61] [abbrv] git commit: Enable View persistence, Storage Plugin and System option persistence.

Enable View persistence, Storage Plugin and System option persistence.

Conflicts:
	exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
	exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/872d0abf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/872d0abf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/872d0abf

Branch: refs/heads/master
Commit: 872d0abf5218ea7da6a22f42a5917e8a88447e26
Parents: 37a4776
Author: Jacques Nadeau <ja...@apache.org>
Authored: Thu May 29 16:44:32 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Jun 8 19:13:04 2014 -0700

----------------------------------------------------------------------
 .../drill/common/expression/parser/ExprParser.g |    2 +
 .../drill/common/logical/PlanProperties.java    |   13 +-
 .../resources/bootstrap-storage-plugins.json    |   11 +
 .../src/test/resources/storage-plugins.json     |   11 -
 .../resources/bootstrap-storage-plugins.json    |   70 +
 distribution/src/resources/storage-plugins.json |   70 -
 exec/java-exec/pom.xml                          |   20 +-
 .../org/apache/drill/exec/ExecConstants.java    |    2 +
 .../exec/cache/AbstractDataSerializable.java    |   33 -
 .../drill/exec/cache/DistributedCache.java      |  165 +-
 .../apache/drill/exec/cache/DistributedMap.java |   13 +-
 .../exec/cache/DistributedMapDeserializer.java  |   28 -
 .../drill/exec/cache/DistributedMultiMap.java   |    6 +-
 .../exec/cache/JacksonAdvancedSerializer.java   |   65 -
 .../exec/cache/JacksonDrillSerializable.java    |   77 -
 .../drill/exec/cache/JacksonSerializable.java   |   68 -
 .../drill/exec/cache/ProtoSerializable.java     |   90 -
 .../exec/cache/ProtobufDrillSerializable.java   |   73 -
 .../exec/cache/SerializationDefinition.java     |    7 +-
 .../cache/VectorAccessibleSerializable.java     |    2 +-
 .../hazel/HCVectorAccessibleSerializer.java     |   58 -
 .../drill/exec/cache/hazel/HazelCache.java      |  260 --
 .../drill/exec/cache/hazel/ProtoBufImpl.java    |   49 -
 .../drill/exec/cache/hazel/ProtoBufWrap.java    |   67 -
 .../apache/drill/exec/cache/hazel/ProtoMap.java |   52 -
 .../drill/exec/cache/infinispan/ICache.java     |  172 +-
 .../drill/exec/cache/local/LocalCache.java      |  271 +-
 .../apache/drill/exec/client/DrillClient.java   |    2 +-
 .../drill/exec/client/QuerySubmitter.java       |    2 +-
 .../drill/exec/coord/ClusterCoordinator.java    |    6 +-
 .../drill/exec/coord/DistributedSemaphore.java  |   26 +
 .../exec/coord/DrillServiceInstanceHelper.java  |   13 +-
 .../exec/coord/LocalClusterCoordinator.java     |   96 -
 .../drill/exec/coord/ZKClusterCoordinator.java  |  198 --
 .../drill/exec/coord/ZKRegistrationHandle.java  |   32 -
 .../coord/local/LocalClusterCoordinator.java    |  139 +
 .../exec/coord/zk/ZKClusterCoordinator.java     |  208 ++
 .../exec/coord/zk/ZKRegistrationHandle.java     |   33 +
 .../exec/coord/zk/ZkDistributedSemaphore.java   |   61 +
 .../drill/exec/dotdrill/DotDrillFile.java       |   58 +
 .../drill/exec/dotdrill/DotDrillType.java       |   52 +
 .../drill/exec/dotdrill/DotDrillUtil.java       |   61 +
 .../org/apache/drill/exec/dotdrill/View.java    |  146 +
 .../org/apache/drill/exec/ops/QueryContext.java |   10 +-
 .../OrderedPartitionRecordBatch.java            |   23 +-
 .../exec/physical/impl/xsort/BatchGroup.java    |    2 +-
 .../exec/planner/logical/DrillViewTable.java    |   53 +-
 .../exec/planner/logical/StoragePlugins.java    |    3 +-
 .../planner/sql/ExpandingConcurrentMap.java     |   23 +-
 .../exec/planner/sql/handlers/ViewHandler.java  |   19 +-
 .../apache/drill/exec/record/WritableBatch.java |    4 +-
 .../drill/exec/rpc/control/WorkEventBus.java    |   15 +-
 .../apache/drill/exec/rpc/user/UserSession.java |    3 -
 .../apache/drill/exec/rpc/user/ViewStore.java   |   93 -
 .../org/apache/drill/exec/server/Drillbit.java  |   16 +-
 .../drill/exec/server/DrillbitContext.java      |   12 +-
 .../drill/exec/server/RemoteServiceSet.java     |    4 +-
 .../drill/exec/server/options/OptionValue.java  |    8 +-
 .../server/options/SystemOptionManager.java     |   39 +-
 .../drill/exec/server/rest/DrillRoot.java       |   14 +-
 .../drill/exec/store/StoragePluginRegistry.java |  102 +-
 .../drill/exec/store/dfs/FileSystemPlugin.java  |    8 +-
 .../exec/store/dfs/WorkspaceSchemaFactory.java  |  127 +-
 .../org/apache/drill/exec/store/sys/PTable.java |   27 +
 .../drill/exec/store/sys/PTableConfig.java      |   77 +
 .../drill/exec/store/sys/TableProvider.java     |   26 +
 .../drill/exec/store/sys/local/LocalTable.java  |  179 +
 .../store/sys/local/LocalTableProvider.java     |   61 +
 .../exec/store/sys/local/NoWriteLocalTable.java |   63 +
 .../store/sys/serialize/JacksonSerializer.java  |   46 +
 .../store/sys/serialize/PClassSerializer.java   |   25 +
 .../store/sys/serialize/ProtoSerializer.java    |   55 +
 .../drill/exec/store/sys/zk/ZkPTable.java       |  182 ++
 .../exec/store/sys/zk/ZkTableProvider.java      |   50 +
 .../org/apache/drill/exec/work/WorkManager.java |    5 +-
 .../exec/work/batch/SpoolingRawBatchBuffer.java |    1 +
 .../apache/drill/exec/work/foreman/Foreman.java |   12 +-
 .../drill/exec/work/foreman/QueryStatus.java    |   14 +-
 .../src/main/resources/drill-module.conf        |   12 +-
 .../java/org/apache/drill/PlanningBase.java     |    8 +-
 .../apache/drill/exec/DrillSystemTestBase.java  |   44 +-
 .../apache/drill/exec/TestWithZookeeper.java    |   81 +
 .../java/org/apache/drill/exec/cache/ISpan.java |   94 -
 .../exec/cache/TestCacheSerialization.java      |   68 +-
 .../exec/client/DrillClientSystemTest.java      |    6 +-
 .../exec/physical/impl/TestOptiqPlans.java      |    3 +-
 .../drill/exec/store/TestOrphanSchema.java      |   85 -
 .../drill/exec/store/sys/PTableTestUtil.java    |   66 +
 .../exec/store/sys/TestTableProviders.java      |   58 +
 .../resources/bootstrap-storage-plugins.json    |   54 +
 .../src/test/resources/drill-module.conf        |    2 +-
 .../src/test/resources/storage-plugins.json     |   54 -
 .../apache/drill/jdbc/test/TestJdbcQuery.java   |    9 +-
 .../resources/bootstrap-storage-plugins.json    |   54 +
 .../src/test/resources/storage-plugins.json     |   54 -
 pom.xml                                         |    2 +-
 .../drill/exec/proto/BitControlHandshake.java   |  206 ++
 .../org/apache/drill/exec/proto/BitStatus.java  |  174 +
 .../apache/drill/exec/proto/FragmentStatus.java |  187 ++
 .../apache/drill/exec/proto/PlanFragment.java   |  466 +++
 .../org/apache/drill/exec/proto/RpcType.java    |   65 +
 .../drill/exec/proto/WorkQueueStatus.java       |  206 ++
 protocol/output                                 | 3076 ++++++++++++++++++
 protocol/pom.xml                                |   93 +-
 .../org/apache/drill/common/types/DataMode.java |   51 +
 .../apache/drill/common/types/MajorType.java    |  273 ++
 .../apache/drill/common/types/MinorType.java    |  117 +
 .../drill/common/types/SchemaTypeProtos.java    |  173 +
 .../org/apache/drill/exec/proto/BitControl.java |   60 +-
 .../org/apache/drill/exec/proto/ExecProtos.java |  643 +++-
 .../drill/exec/proto/SchemaBitControl.java      |  733 +++++
 .../apache/drill/exec/proto/SchemaBitData.java  |  415 +++
 .../exec/proto/SchemaCoordinationProtos.java    |  434 +++
 .../drill/exec/proto/SchemaExecProtos.java      |  272 ++
 .../exec/proto/SchemaGeneralRPCProtos.java      |  524 +++
 .../drill/exec/proto/SchemaSchemaDefProtos.java |   26 +
 .../drill/exec/proto/SchemaUserBitShared.java   | 1801 ++++++++++
 .../drill/exec/proto/SchemaUserProtos.java      | 1064 ++++++
 .../apache/drill/exec/proto/UserBitShared.java  |  190 +-
 .../org/apache/drill/exec/proto/beans/Ack.java  |  163 +
 .../exec/proto/beans/BitClientHandshake.java    |  209 ++
 .../exec/proto/beans/BitControlHandshake.java   |  209 ++
 .../exec/proto/beans/BitServerHandshake.java    |  163 +
 .../drill/exec/proto/beans/BitStatus.java       |  175 +
 .../exec/proto/beans/BitToUserHandshake.java    |  163 +
 .../exec/proto/beans/CompleteRpcMessage.java    |  210 ++
 .../exec/proto/beans/CoreOperatorType.java      |  107 +
 .../drill/exec/proto/beans/DrillPBError.java    |  265 ++
 .../exec/proto/beans/DrillServiceInstance.java  |  209 ++
 .../exec/proto/beans/DrillbitEndpoint.java      |  253 ++
 .../drill/exec/proto/beans/FragmentHandle.java  |  209 ++
 .../exec/proto/beans/FragmentRecordBatch.java   |  278 ++
 .../drill/exec/proto/beans/FragmentState.java   |   57 +
 .../drill/exec/proto/beans/FragmentStatus.java  |  189 ++
 .../exec/proto/beans/MajorFragmentProfile.java  |  197 ++
 .../drill/exec/proto/beans/MetricValue.java     |  207 ++
 .../exec/proto/beans/MinorFragmentProfile.java  |  355 ++
 .../apache/drill/exec/proto/beans/NamePart.java |  237 ++
 .../drill/exec/proto/beans/NodeStatus.java      |  185 ++
 .../drill/exec/proto/beans/OperatorProfile.java |  317 ++
 .../drill/exec/proto/beans/ParsingError.java    |  229 ++
 .../drill/exec/proto/beans/PlanFragment.java    |  481 +++
 .../apache/drill/exec/proto/beans/Property.java |  199 ++
 .../apache/drill/exec/proto/beans/QueryId.java  |  185 ++
 .../drill/exec/proto/beans/QueryProfile.java    |  309 ++
 .../drill/exec/proto/beans/QueryResult.java     |  445 +++
 .../exec/proto/beans/QueryResultsMode.java      |   47 +
 .../drill/exec/proto/beans/QueryType.java       |   51 +
 .../drill/exec/proto/beans/RecordBatchDef.java  |  219 ++
 .../drill/exec/proto/beans/RequestResults.java  |  187 ++
 .../apache/drill/exec/proto/beans/Roles.java    |  256 ++
 .../drill/exec/proto/beans/RpcChannel.java      |   51 +
 .../drill/exec/proto/beans/RpcFailure.java      |  229 ++
 .../drill/exec/proto/beans/RpcHeader.java       |  207 ++
 .../apache/drill/exec/proto/beans/RpcMode.java  |   51 +
 .../apache/drill/exec/proto/beans/RpcType.java  |   65 +
 .../apache/drill/exec/proto/beans/RunQuery.java |  207 ++
 .../drill/exec/proto/beans/SerializedField.java |  311 ++
 .../drill/exec/proto/beans/StreamProfile.java   |  207 ++
 .../drill/exec/proto/beans/UserCredentials.java |  163 +
 .../drill/exec/proto/beans/UserProperties.java  |  175 +
 .../exec/proto/beans/UserToBitHandshake.java    |  255 ++
 .../drill/exec/proto/beans/ValueMode.java       |   51 +
 .../drill/exec/proto/beans/ViewPointer.java     |  185 ++
 .../drill/exec/proto/beans/WorkQueueStatus.java |  209 ++
 protocol/src/main/protobuf/BitControl.proto     |    2 +-
 .../src/main/protobuf/ExecutionProtos.proto     |    6 +
 protocol/src/main/protobuf/UserBitShared.proto  |    2 +-
 168 files changed, 23271 insertions(+), 2362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
----------------------------------------------------------------------
diff --git a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
index 9737e5d..ba6d48b 100644
--- a/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
+++ b/common/src/main/antlr3/org/apache/drill/common/expression/parser/ExprParser.g
@@ -39,6 +39,8 @@ import org.apache.drill.common.expression.PathSegment.NameSegment;
 import org.apache.drill.common.expression.PathSegment.ArraySegment;
 import org.apache.drill.common.types.*;
 import org.apache.drill.common.types.TypeProtos.*;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.exceptions.ExpressionParsingException;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
index a2942b6..e8fdebf 100644
--- a/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
+++ b/common/src/main/java/org/apache/drill/common/logical/PlanProperties.java
@@ -33,6 +33,7 @@ public class PlanProperties {
 	public Generator generator;
 	public ResultMode resultMode;
   public JSONOptions options;
+  public int queue;
 
 //	@JsonInclude(Include.NON_NULL)
 	public static class Generator{
@@ -53,9 +54,11 @@ public class PlanProperties {
                          @JsonProperty("generator") Generator generator,
                          @JsonProperty("type") PlanType type,
                          @JsonProperty("mode") ResultMode resultMode,
-                         @JsonProperty("options") JSONOptions options
+                         @JsonProperty("options") JSONOptions options,
+                         @JsonProperty("queue") int queue
                          ) {
     this.version = version;
+    this.queue = queue;
     this.generator = generator;
     this.type = type;
     this.resultMode = resultMode == null ? ResultMode.EXEC : resultMode;
@@ -72,6 +75,7 @@ public class PlanProperties {
     private PlanType type;
     private ResultMode mode = ResultMode.EXEC;
     private JSONOptions options;
+    private int queueNumber = 0;
 
     public PlanPropertiesBuilder type(PlanType type) {
       this.type = type;
@@ -93,6 +97,11 @@ public class PlanProperties {
       return this;
     }
 
+    public PlanPropertiesBuilder queue(int queueNumber){
+      this.queueNumber = queueNumber;
+      return this;
+    }
+
     public PlanPropertiesBuilder options(JSONOptions options){
       this.options = options;
       return this;
@@ -104,7 +113,7 @@ public class PlanProperties {
     }
 
     public PlanProperties build() {
-      return new PlanProperties(version, generator, type, mode, options);
+      return new PlanProperties(version, generator, type, mode, options, queueNumber);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..0e93f7e
--- /dev/null
+++ b/contrib/storage-hbase/src/test/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,11 @@
+{
+  "storage":{
+    hbase : {
+      type:"hbase",
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/contrib/storage-hbase/src/test/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/src/test/resources/storage-plugins.json b/contrib/storage-hbase/src/test/resources/storage-plugins.json
deleted file mode 100644
index 0e93f7e..0000000
--- a/contrib/storage-hbase/src/test/resources/storage-plugins.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
-  "storage":{
-    hbase : {
-      type:"hbase",
-      config : {
-        "hbase.zookeeper.quorum" : "localhost",
-        "hbase.zookeeper.property.clientPort" : 2181
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/distribution/src/resources/bootstrap-storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/bootstrap-storage-plugins.json b/distribution/src/resources/bootstrap-storage-plugins.json
new file mode 100644
index 0000000..3b1cbd0
--- /dev/null
+++ b/distribution/src/resources/bootstrap-storage-plugins.json
@@ -0,0 +1,70 @@
+{
+  "storage":{
+    dfs: {
+      type: "file",
+      connection: "file:///",
+      workspaces: {
+        "root" : {
+          location: "/",
+          writable: false
+        },
+        "tmp" : {
+          location: "/tmp",
+          writable: true,
+          storageformat: "csv"
+        }
+      },
+      formats: {
+        "psv" : {
+          type: "text",
+          extensions: [ "tbl" ],
+          delimiter: "|"
+        },
+        "csv" : {
+          type: "text",
+          extensions: [ "csv" ],
+          delimiter: ","
+        },
+        "tsv" : {
+          type: "text",
+          extensions: [ "tsv" ],
+          delimiter: "\t"
+        },
+        "parquet" : {
+          type: "parquet"
+        },
+        "json" : {
+          type: "json"
+        }
+      }
+    },
+    cp: {
+      type: "file",
+      connection: "classpath:///"
+    } 
+
+    /*,
+    hive : {
+        type:"hive",
+        config :
+          {
+            "hive.metastore.uris" : "",
+            "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
+            "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
+            "fs.default.name" : "file:///",
+            "hive.metastore.sasl.enabled" : "false"
+          }
+      }
+      */
+
+    /*,
+    hbase : {
+      type:"hbase",
+      config : {
+        "hbase.zookeeper.quorum" : "localhost",
+        "hbase.zookeeper.property.clientPort" : 2181
+      }
+    }
+    */
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/distribution/src/resources/storage-plugins.json
----------------------------------------------------------------------
diff --git a/distribution/src/resources/storage-plugins.json b/distribution/src/resources/storage-plugins.json
deleted file mode 100644
index 3b1cbd0..0000000
--- a/distribution/src/resources/storage-plugins.json
+++ /dev/null
@@ -1,70 +0,0 @@
-{
-  "storage":{
-    dfs: {
-      type: "file",
-      connection: "file:///",
-      workspaces: {
-        "root" : {
-          location: "/",
-          writable: false
-        },
-        "tmp" : {
-          location: "/tmp",
-          writable: true,
-          storageformat: "csv"
-        }
-      },
-      formats: {
-        "psv" : {
-          type: "text",
-          extensions: [ "tbl" ],
-          delimiter: "|"
-        },
-        "csv" : {
-          type: "text",
-          extensions: [ "csv" ],
-          delimiter: ","
-        },
-        "tsv" : {
-          type: "text",
-          extensions: [ "tsv" ],
-          delimiter: "\t"
-        },
-        "parquet" : {
-          type: "parquet"
-        },
-        "json" : {
-          type: "json"
-        }
-      }
-    },
-    cp: {
-      type: "file",
-      connection: "classpath:///"
-    } 
-
-    /*,
-    hive : {
-        type:"hive",
-        config :
-          {
-            "hive.metastore.uris" : "",
-            "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../../sample-data/drill_hive_db;create=true",
-            "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
-            "fs.default.name" : "file:///",
-            "hive.metastore.sasl.enabled" : "false"
-          }
-      }
-      */
-
-    /*,
-    hbase : {
-      type:"hbase",
-      config : {
-        "hbase.zookeeper.quorum" : "localhost",
-        "hbase.zookeeper.property.clientPort" : 2181
-      }
-    }
-    */
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 83220ab..ff6f552 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -92,10 +92,10 @@
       <version>2.8</version>
     </dependency>
     <dependency>
-        <groupId>org.glassfish.jersey.ext</groupId>
-        <artifactId>jersey-mvc-freemarker</artifactId>
-        <version>2.8</version>
-    </dependency>    
+      <groupId>org.glassfish.jersey.ext</groupId>
+      <artifactId>jersey-mvc-freemarker</artifactId>
+      <version>2.8</version>
+    </dependency>
     <dependency>
       <groupId>net.hydromatic</groupId>
       <artifactId>optiq-core</artifactId>
@@ -190,14 +190,10 @@
       <version>1.30</version>
     </dependency>
     <dependency>
-      <groupId>com.netflix.curator</groupId>
+      <groupId>org.apache.curator</groupId>
       <artifactId>curator-x-discovery</artifactId>
-      <version>1.1.9</version>
+      <version>2.5.0</version>
       <exclusions>
-        <!-- <exclusion> -->
-        <!-- <artifactId>netty</artifactId> -->
-        <!-- <groupId>org.jboss.netty</groupId> -->
-        <!-- </exclusion> -->
         <exclusion>
           <artifactId>slf4j-log4j12</artifactId>
           <groupId>org.slf4j</groupId>
@@ -231,12 +227,12 @@
     <dependency>
       <groupId>org.infinispan</groupId>
       <artifactId>infinispan-core</artifactId>
-      <version>6.0.1.Final</version>
+      <version>6.0.2.Final</version>
     </dependency>
     <dependency>
       <groupId>org.infinispan</groupId>
       <artifactId>infinispan-tree</artifactId>
-      <version>6.0.1.Final</version>
+      <version>6.0.2.Final</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.janino</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 1ece198..5063f63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -74,5 +74,7 @@ public interface ExecConstants {
   public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
   public static final String HTTP_ENABLE = "drill.exec.http.enabled";
   public static final String HTTP_PORT = "drill.exec.http.port";
+  public static final String SYS_TABLES_LOCAL_PATH = "drill.exec.sys.tables.local.path";
+  public static final String SYS_TABLES_LOCAL_ENABLE_WRITE = "drill.exec.sys.tables.local.write";
   public static final String ERROR_ON_MEMORY_LEAK = "drill.exec.debug.error_on_leak";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
deleted file mode 100644
index f7b9eed..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-public abstract class AbstractDataSerializable extends LoopedAbstractDrillSerializable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDataSerializable.class);
-
-  @Override
-  public abstract void read(DataInput input) throws IOException;
-
-  @Override
-  public abstract void write(DataOutput output) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index aa87162..1c5de85 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -17,9 +17,11 @@
  */
 package org.apache.drill.exec.cache;
 
+import java.util.Map;
+
 import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.google.protobuf.Message;
 
 
 public interface DistributedCache extends AutoCloseable{
@@ -27,13 +29,158 @@ public interface DistributedCache extends AutoCloseable{
 
   public void run() throws DrillbitStartupException;
 
-//  public void updateLocalQueueLength(int length);
-//  public List<WorkQueueStatus> getQueueLengths();
+  public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config);
+  public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> config);
 
-  public PlanFragment getFragment(FragmentHandle handle);
-  public void storeFragment(PlanFragment fragment);
-  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz);
-  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz);
-  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz);
   public Counter getCounter(String name);
+
+  public static enum SerializationMode {
+    JACKSON(Object.class),
+    DRILL_SERIALIZIABLE(String.class, DrillSerializable.class),
+    PROTOBUF(String.class, Message.class);
+
+    private final Class<?>[] classes;
+    private SerializationMode(Class<?>... classes){
+      this.classes = classes;
+    }
+
+    public void checkClass(Class<?> classToCheck){
+      for(Class<?> c : classes){
+        if(c.isAssignableFrom(classToCheck)) return;
+      }
+
+      throw new UnsupportedOperationException(String.format("You are trying to serialize the class %s using the serialization mode %s.  This is not allowed.", classToCheck.getName(), this.name()));
+    }
+  }
+
+  public static class CacheConfig<K, V>{
+    private final Class<K> keyClass;
+    private final Class<V> valueClass;
+    private final String name;
+    private final SerializationMode mode;
+
+    public CacheConfig(Class<K> keyClass, Class<V> valueClass, String name, SerializationMode mode) {
+      super();
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      this.name = name;
+      this.mode = mode;
+    }
+
+    public Class<K> getKeyClass() {
+      return keyClass;
+    }
+
+    public Class<V> getValueClass() {
+      return valueClass;
+    }
+
+    public SerializationMode getMode() {
+      return mode;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((keyClass == null) ? 0 : keyClass.hashCode());
+      result = prime * result + ((mode == null) ? 0 : mode.hashCode());
+      result = prime * result + ((name == null) ? 0 : name.hashCode());
+      result = prime * result + ((valueClass == null) ? 0 : valueClass.hashCode());
+      return result;
+    }
+
+    public static <V> CacheConfigBuilder<String, V> newBuilder(Class<V> valueClass) {
+      return newBuilder(String.class, valueClass);
+    }
+
+    public static <K, V> CacheConfigBuilder<K, V> newBuilder(Class<K> keyClass, Class<V> valueClass) {
+      return new CacheConfigBuilder<K, V>(keyClass, valueClass);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj)
+        return true;
+      if (obj == null)
+        return false;
+      if (getClass() != obj.getClass())
+        return false;
+      CacheConfig other = (CacheConfig) obj;
+      if (keyClass == null) {
+        if (other.keyClass != null)
+          return false;
+      } else if (!keyClass.equals(other.keyClass))
+        return false;
+      if (mode != other.mode)
+        return false;
+      if (name == null) {
+        if (other.name != null)
+          return false;
+      } else if (!name.equals(other.name))
+        return false;
+      if (valueClass == null) {
+        if (other.valueClass != null)
+          return false;
+      } else if (!valueClass.equals(other.valueClass))
+        return false;
+      return true;
+    }
+
+
+  }
+
+  public static class CacheConfigBuilder<K, V> {
+
+    private Class<K> keyClass;
+    private Class<V> valueClass;
+    private String name;
+    private SerializationMode mode = SerializationMode.DRILL_SERIALIZIABLE;
+
+    private CacheConfigBuilder(Class<K> keyClass, Class<V> valueClass) {
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+      this.name = keyClass.getName();
+    }
+
+
+    public CacheConfigBuilder<K, V> mode(SerializationMode mode){
+      this.mode = mode;
+      return this;
+    }
+
+    public CacheConfigBuilder<K, V> proto(){
+      this.mode = SerializationMode.PROTOBUF;
+      return this;
+    }
+
+    public CacheConfigBuilder<K, V> jackson(){
+      this.mode = SerializationMode.JACKSON;
+      return this;
+    }
+
+    public CacheConfigBuilder<K, V> drill(){
+      this.mode = SerializationMode.DRILL_SERIALIZIABLE;
+      return this;
+    }
+
+
+    public CacheConfigBuilder<K, V> name(String name){
+      this.name = name;
+      return this;
+    }
+
+    public CacheConfig<K, V> build(){
+      mode.checkClass(keyClass);
+      mode.checkClass(valueClass);
+      return new CacheConfig<K, V>(keyClass, valueClass, name, mode);
+    }
+
+
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
index 1bee5fc..2411434 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMap.java
@@ -20,10 +20,13 @@ package org.apache.drill.exec.cache;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-public interface DistributedMap<V extends DrillSerializable> extends Iterable<Map.Entry<String, V>>{
+public interface DistributedMap<K, V>{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMap.class);
-  public V get(String key);
-  public void put(String key, V value);
-  public void putIfAbsent(String key, V value);
-  public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit);
+  public V get(K key);
+  public void put(K key, V value);
+  public void delete(K key);
+  public void putIfAbsent(K key, V value);
+  public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit);
+  public Iterable<Map.Entry<K, V>> getLocalEntries();
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
deleted file mode 100644
index 453dbfb..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMapDeserializer.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-
-public interface DistributedMapDeserializer<V extends DrillSerializable> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMapDeserializer.class);
-
-  public V put(String key, byte[] value) throws IOException;
-
-  public Class getValueClass();
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
index 645fb7c..bf06646 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedMultiMap.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.cache;
 
 import java.util.Collection;
 
-public interface DistributedMultiMap<V extends DrillSerializable> {
+public interface DistributedMultiMap<K, V> {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedMultiMap.class);
-  public Collection<V> get(String key);
-  public void put(String key, V value);
+  public Collection<V> get(K key);
+  public void put(K key, V value);
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
deleted file mode 100644
index edf31aa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonAdvancedSerializer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-
-import org.apache.drill.common.util.DataInputInputStream;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-public class JacksonAdvancedSerializer<T> implements StreamSerializer<T>  {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonAdvancedSerializer.class);
-
-  private final Class<?> clazz;
-  private final ObjectMapper mapper;
-  private final int id;
-
-  public JacksonAdvancedSerializer(SerializationDefinition def, ObjectMapper mapper){
-    this.clazz =  def.clazz;
-    this.mapper = mapper;
-    this.id = def.id;
-  }
-
-  @Override
-  public int getTypeId() {
-    return id;
-  }
-
-  @Override
-  public void destroy() {
-  }
-
-  @Override
-  public void write(ObjectDataOutput out, T object) throws IOException {
-    out.write(mapper.writeValueAsBytes(object));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public T read(ObjectDataInput in) throws IOException {
-    return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
-  }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
deleted file mode 100644
index 617c356..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.planner.logical.StoragePlugins;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-public abstract class JacksonDrillSerializable<T> extends LoopedAbstractDrillSerializable implements DrillSerializable{
-  private ObjectMapper mapper;
-  private T obj;
-  private Class<T> clazz;
-
-  public JacksonDrillSerializable(DrillbitContext context, T obj, Class<T> clazz) {
-    this(clazz);
-    this.mapper = context.getConfig().getMapper();
-    this.obj = obj;
-  }
-
-  public JacksonDrillSerializable(Class<T> clazz) {
-    this.clazz = clazz;
-  }
-
-  @Override
-  public void readFromStream(InputStream input) throws IOException {
-    mapper = DrillConfig.create().getMapper();
-    obj = (T) mapper.readValue(input, clazz);
-  }
-
-  @Override
-  public void writeToStream(OutputStream output) throws IOException {
-    output.write(mapper.writeValueAsBytes(obj));
-  }
-
-  public T getObj() {
-    return obj;
-  }
-
-  public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
-
-    public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
-      super(context, obj, StoragePlugins.class);
-    }
-
-    public StoragePluginsSerializable(BufferAllocator allocator) {
-      super(StoragePlugins.class);
-    }
-
-    public StoragePluginsSerializable() {
-      super(StoragePlugins.class);
-    }
-
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
deleted file mode 100644
index 247c79e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonSerializable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.OutputStream;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-
-public abstract class JacksonSerializable implements DrillSerializable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonSerializable.class);
-
-  private void fail(){
-    throw new UnsupportedOperationException("Need to register serialization config for class " + this.getClass().getName()); // rely on external serializer
-
-  }
-
-  @Override
-  public void writeExternal(ObjectOutput out) throws IOException {
-    fail();
-  }
-
-  @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-    fail();
-  }
-
-  @Override
-  public void read(DataInput input) throws IOException {
-    fail();
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    fail();
-  }
-
-  @Override
-  public void readFromStream(InputStream input) throws IOException {
-    fail();
-  }
-
-  @Override
-  public void writeToStream(OutputStream output) throws IOException {
-    fail();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
deleted file mode 100644
index f48aae1..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-public abstract class ProtoSerializable<V extends Message> extends AbstractStreamSerializable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoSerializable.class);
-
-  private Parser<V> protoParser;
-  private V obj;
-
-  ProtoSerializable(Parser<V> protoParser, V obj) {
-    super();
-    this.protoParser = protoParser;
-    this.obj = obj;
-  }
-
-  public V getObject(){
-    return obj;
-  }
-
-  @Override
-  public void readFromStream(InputStream input) throws IOException {
-    obj = protoParser.parseDelimitedFrom(input);
-  }
-
-  @Override
-  public void writeToStream(OutputStream output) throws IOException {
-    obj.writeDelimitedTo(output);
-  }
-
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + ((obj == null) ? 0 : obj.hashCode());
-    return result;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    ProtoSerializable other = (ProtoSerializable) obj;
-    if (this.obj == null) {
-      if (other.obj != null)
-        return false;
-    } else if (!this.obj.equals(other.obj))
-      return false;
-    return true;
-  }
-
-  public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{
-    public PlanFragmentSerializable(PlanFragment obj) {super(PlanFragment.PARSER, obj);}
-    public PlanFragmentSerializable(){this(null);}
-  }
-  public static class FragmentHandleSerializable extends ProtoSerializable<FragmentHandle>{
-    public FragmentHandleSerializable(FragmentHandle obj) {super(FragmentHandle.PARSER, obj);}
-    public FragmentHandleSerializable(){this(null);}
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
deleted file mode 100644
index 9d6b645..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtobufDrillSerializable.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-
-import com.google.protobuf.Message;
-import com.google.protobuf.Parser;
-
-public abstract class ProtobufDrillSerializable<T extends Message> extends LoopedAbstractDrillSerializable implements DrillSerializable{
-  private Parser<T> parser;
-  private T obj;
-
-  public ProtobufDrillSerializable(T obj){
-    this.parser = (Parser<T>) obj.getParserForType();
-    this.obj = obj;
-  }
-
-  public ProtobufDrillSerializable(Parser<T> parser) {
-    this.parser = parser;
-  }
-
-  @Override
-  public void readFromStream(InputStream input) throws IOException {
-    obj = parser.parseDelimitedFrom(input);
-  }
-
-  @Override
-  public void writeToStream(OutputStream output) throws IOException {
-    obj.writeDelimitedTo(output);
-  }
-
-  public T getObj() {
-    return obj;
-  }
-
-  public static class CQueryProfile extends ProtobufDrillSerializable<QueryProfile>{
-
-    public CQueryProfile(BufferAllocator allocator){
-      super(QueryProfile.PARSER);
-    }
-
-    public CQueryProfile() {
-      super(QueryProfile.PARSER);
-
-    }
-
-    public CQueryProfile(QueryProfile obj) {
-      super(obj);
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
index 711ddf1..a5df73a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/SerializationDefinition.java
@@ -19,14 +19,19 @@ package org.apache.drill.exec.cache;
 
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.server.options.OptionValue;
 
 public enum SerializationDefinition {
 
   OPTION(3002, OptionValue.class),
   STORAGE_PLUGINS(3003, StoragePlugins.class),
-  FRAGMENT_STATUS(3004, FragmentStatus.class)
+  FRAGMENT_STATUS(3004, FragmentStatus.class),
+  FRAGMENT_HANDLE(3005, FragmentHandle.class),
+  PLAN_FRAGMENT(3006, PlanFragment.class)
   ;
+
   public final int id;
   public final Class<?> clazz;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 63ed592..cff56d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -98,7 +98,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
     VectorContainer container = new VectorContainer();
     UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCount();
-    if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+    if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
 
       if (sv2 == null) {
         sv2 = new SelectionVector2(allocator);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
deleted file mode 100644
index bac2323..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
-
-  private BufferAllocator allocator;
-
-  public HCVectorAccessibleSerializer(BufferAllocator allocator) {
-    this.allocator = allocator;
-  }
-
-  public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
-    VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
-    va.readFromStream(DataInputInputStream.constructInputStream(in));
-    return va;
-  }
-
-  public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
-    va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
-  }
-
-  public void destroy() {}
-
-  public int getTypeId() {
-    return 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
deleted file mode 100644
index f83456b..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.Counter;
-import org.apache.drill.exec.cache.DistributedCache;
-import org.apache.drill.exec.cache.DistributedMap;
-import org.apache.drill.exec.cache.DistributedMultiMap;
-import org.apache.drill.exec.cache.DrillSerializable;
-import org.apache.drill.exec.cache.JacksonAdvancedSerializer;
-import org.apache.drill.exec.cache.SerializationDefinition;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
-import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HWorkQueueStatus;
-import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HandlePlan;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.SerializerConfig;
-import com.hazelcast.core.DuplicateInstanceNameException;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-public class HazelCache implements DistributedCache {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
-
-  private final String instanceName;
-  private HazelcastInstance instance;
-  private ITopic<HWorkQueueStatus> workQueueLengths;
-  private HandlePlan fragments;
-  private Cache<WorkQueueStatus, Integer>  endpoints;
-  private BufferAllocator allocator;
-  private DrillConfig config;
-
-  public HazelCache(DrillConfig config, BufferAllocator allocator) {
-    this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
-    this.allocator = allocator;
-    this.config = config;
-  }
-
-  private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
-    SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
-    c.getSerializationConfig().addSerializerConfig(sc);
-  }
-
-  @SuppressWarnings("rawtypes")
-  private <T> void addJSer(Config c, SerializationDefinition d){
-    SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
-    c.getSerializationConfig().addSerializerConfig(sc);
-  }
-
-
-  private class Listener implements MessageListener<HWorkQueueStatus>{
-
-    @Override
-    public void onMessage(Message<HWorkQueueStatus> wrapped) {
-      logger.debug("Received new queue length message.");
-      endpoints.put(wrapped.getMessageObject().get(), 0);
-    }
-
-  }
-
-  public void run() {
-    Config c = new Config();
-    addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
-    addJSer(c, SerializationDefinition.OPTION);
-    addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
-
-    c.setInstanceName(instanceName);
-    c.getGroupConfig().setName(instanceName);
-    for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
-      logger.debug("Adding interface: {}", s);
-      c.getNetworkConfig().getInterfaces().setEnabled(true).addInterface(s);
-    }
-
-    instance = getInstanceOrCreateNew(c);
-    workQueueLengths = instance.getTopic("queue-length");
-    fragments = new HandlePlan(instance);
-    endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
-    workQueueLengths.addMessageListener(new Listener());
-  }
-
-  private HazelcastInstance getInstanceOrCreateNew(Config c) {
-    for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
-      if (instance.getName().equals(this.instanceName))
-        return instance;
-    }
-    try {
-    return Hazelcast.newHazelcastInstance(c);
-    } catch (DuplicateInstanceNameException e) {
-      return getInstanceOrCreateNew(c);
-    }
-  }
-
-//  @Override
-//  public void updateLocalQueueLength(int length) {
-//    workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
-//        .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
-//  }
-//
-//  @Override
-//  public List<WorkQueueStatus> getQueueLengths() {
-//    return Lists.newArrayList(endpoints.asMap().keySet());
-//  }
-
-  @Override
-  public void close() throws IOException {
-    this.instance.getLifecycleService().shutdown();
-  }
-
-  @Override
-  public PlanFragment getFragment(FragmentHandle handle) {
-    return this.fragments.get(handle);
-  }
-
-  @Override
-  public void storeFragment(PlanFragment fragment) {
-    fragments.put(fragment.getHandle(), fragment);
-  }
-
-
-  @Override
-  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
-    com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
-    return new HCDistributedMultiMapImpl<V>(mmap, clazz);
-  }
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
-    return getNamedMap(clazz.getName(), clazz);
-  }
-
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
-    IMap<String, V> imap = this.instance.getMap(name);
-    MapConfig myMapConfig = new MapConfig();
-    myMapConfig.setBackupCount(0);
-    myMapConfig.setReadBackupData(true);
-    instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
-    return new HCDistributedMapImpl<V>(imap);
-  }
-
-  @Override
-  public Counter getCounter(String name) {
-    return new HCCounterImpl(this.instance.getAtomicLong(name));
-  }
-
-
-
-  public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
-    private final IMap<String, V> m;
-
-    public HCDistributedMapImpl(IMap<String, V> m) {
-      this.m = m;
-    }
-
-    public V get(String key) {
-      return m.get(key);
-    }
-
-    public void put(String key, V value) {
-      m.put(key, value);
-    }
-
-    public void putIfAbsent(String key, V value) {
-      m.putIfAbsent(key, value);
-    }
-
-    public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
-      m.putIfAbsent(key, value, ttl, timeunit);
-
-    }
-
-    @Override
-    public Iterator<Entry<String, V>> iterator() {
-      return m.entrySet().iterator();
-    }
-
-
-  }
-
-  public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
-    private com.hazelcast.core.MultiMap<String, V> mmap;
-
-    public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
-      this.mmap = mmap;
-    }
-
-    public Collection<V> get(String key) {
-      List<V> list = Lists.newArrayList();
-      for (V v : mmap.get(key)) {
-        list.add(v);
-      }
-      return list;
-    }
-
-    @Override
-    public void put(String key, V value) {
-      mmap.put(key, value);
-    }
-  }
-
-  public static class HCCounterImpl implements Counter {
-    private IAtomicLong n;
-
-    public HCCounterImpl(IAtomicLong n) {
-      this.n = n;
-    }
-
-    public long get() {
-      return n.get();
-    }
-
-    public long incrementAndGet() {
-      return n.incrementAndGet();
-    }
-
-    public long decrementAndGet() {
-      return n.decrementAndGet();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
deleted file mode 100644
index d992aa7..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.hazelcast.core.HazelcastInstance;
-
-public class ProtoBufImpl {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
-  
-  public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
-    public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
-    public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
-  }
-  
-  public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
-    public HFragmentHandle() {super(FragmentHandle.PARSER);}
-    public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
-  }
-  
-  public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
-    public HPlanFragment() {super(PlanFragment.PARSER);}
-    public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
-  }
-  
-  public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
-    public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
-    public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
-    public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
deleted file mode 100644
index 23a4e08..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import java.io.IOException;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
-
-public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
-  
-  T value;
-  final Parser<T> parser;
-  
-  public ProtoBufWrap(Parser<T> parser){
-    this(null, parser);
-  }
-  
-  public ProtoBufWrap(T value, Parser<T> parser){
-    this.value = value;
-    this.parser = parser;
-  }
-  
-  @Override
-  public void readData(ObjectDataInput arg0) throws IOException {
-    int len = arg0.readShort();
-    byte[] b = new byte[len];
-    arg0.readFully(b);
-    this.value = parser.parseFrom(b);
-  }
-
-  @Override
-  public void writeData(ObjectDataOutput arg0) throws IOException {
-    byte[] b = value.toByteArray();
-    if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
-    arg0.writeShort(b.length);
-    arg0.write(b);
-  }
-
-  protected T get() {
-    return value;
-  }
-
-  protected void set(T value) {
-    this.value = value;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
deleted file mode 100644
index 72d793a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache.hazel;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-
-public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
-
-  private IMap<HK, HV> hzMap;
-  
-  public ProtoMap(HazelcastInstance instance, String mapName){
-    hzMap = instance.getMap(mapName);
-  }
-  
-  public V get(K key){
-    Preconditions.checkNotNull(key);
-    HK hk = getNewKey(key);
-    HV hv = hzMap.get(hk);
-    if(hv == null) return null;
-    return hv.get();
-  }
-  
-  public V put(K key, V value){
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(value);
-    HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
-    return oldValue == null ? null : oldValue.get();
-  }
-  
-  public abstract HK getNewKey(K key);
-  public abstract HV getNewValue(V key);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/872d0abf/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index f56f19a..fb346ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -19,10 +19,10 @@ package org.apache.drill.exec.cache.infinispan;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -31,10 +31,8 @@ import org.apache.drill.exec.cache.Counter;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.cache.DistributedMap;
 import org.apache.drill.exec.cache.DistributedMultiMap;
-import org.apache.drill.exec.cache.DrillSerializable;
 import org.apache.drill.exec.cache.SerializationDefinition;
-import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
-import org.apache.drill.exec.cache.ProtoSerializable.PlanFragmentSerializable;
+import org.apache.drill.exec.cache.local.LocalCache.LocalCounterImpl;
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -46,7 +44,6 @@ import org.infinispan.atomic.DeltaAware;
 import org.infinispan.configuration.cache.CacheMode;
 import org.infinispan.configuration.cache.Configuration;
 import org.infinispan.configuration.cache.ConfigurationBuilder;
-import org.infinispan.configuration.global.GlobalConfiguration;
 import org.infinispan.configuration.global.GlobalConfigurationBuilder;
 import org.infinispan.manager.DefaultCacheManager;
 import org.infinispan.manager.EmbeddedCacheManager;
@@ -57,41 +54,65 @@ import org.jgroups.protocols.COUNTER;
 import org.jgroups.protocols.FRAG2;
 import org.jgroups.stack.ProtocolStack;
 
+import com.google.hive12.common.collect.Maps;
+
 public class ICache implements DistributedCache{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class);
 
   private EmbeddedCacheManager manager;
   private ForkChannel cacheChannel;
   private final CounterService counters;
-  private final Cache<FragmentHandleSerializable, PlanFragmentSerializable> fragments;
+  private final boolean local;
+  private volatile ConcurrentMap<String, Counter> localCounters;
 
-  public ICache(DrillConfig config, BufferAllocator allocator) throws Exception {
+  public ICache(DrillConfig config, BufferAllocator allocator, boolean local) throws Exception {
     String clusterName = config.getString(ExecConstants.SERVICE_NAME);
-    GlobalConfiguration gc = new GlobalConfigurationBuilder() //
-      .transport() //
+    this.local = local;
+
+    final CacheMode mode = local ? CacheMode.LOCAL : CacheMode.DIST_ASYNC;
+    GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+
+    if(!local){
+      gcb.transport() //
         .defaultTransport() //
-        .clusterName(clusterName) //;
-        //
-      .serialization() //
+        .clusterName(clusterName);
+    }
+
+    gcb.serialization() //
         .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator)) //
         .addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.OPTION, config.getMapper())) //
         .addAdvancedExternalizer(new JacksonAdvancedExternalizer<>(SerializationDefinition.STORAGE_PLUGINS, config.getMapper())) //
         .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_STATUS, FragmentStatus.PARSER)) //
+        .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.FRAGMENT_HANDLE, FragmentHandle.PARSER)) //
+        .addAdvancedExternalizer(new ProtobufAdvancedExternalizer<>(SerializationDefinition.PLAN_FRAGMENT, PlanFragment.PARSER)) //
       .build();
 
     Configuration c = new ConfigurationBuilder() //
       .clustering() //
 
-      .cacheMode(CacheMode.DIST_ASYNC) //
+      .cacheMode(mode) //
       .storeAsBinary().enable() //
       .build();
-    this.manager = new DefaultCacheManager(gc, c);
-    JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
-    this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER());
-    this.fragments = manager.getCache(PlanFragment.class.getName());
-    this.counters = new CounterService(this.cacheChannel);
+    this.manager = new DefaultCacheManager(gcb.build(), c);
+
+    if(!local){
+      JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
+      this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER());
+      this.counters = new CounterService(this.cacheChannel);
+    }else{
+      this.cacheChannel = null;
+      this.counters = null;
+    }
   }
 
+
+//  @Override
+//  public <K, V> Map<K, V> getSmallAtomicMap(CacheConfig<K, V> config) {
+//    Cache<String, ?> cache = manager.getCache("atomic-maps");
+//    return AtomicMapLookup.getAtomicMap(cache, config.getName());
+//  }
+
+
   @Override
   public void close() throws IOException {
     manager.stop();
@@ -100,45 +121,45 @@ public class ICache implements DistributedCache{
   @Override
   public void run() throws DrillbitStartupException {
     try {
-      cacheChannel.connect("c1");
+      if(local){
+        localCounters = Maps.newConcurrentMap();
+        manager.start();
+      }else{
+        cacheChannel.connect("c1");
+      }
+
     } catch (Exception e) {
       throw new DrillbitStartupException("Failure while trying to set up JGroups.");
     }
   }
 
   @Override
-  public PlanFragment getFragment(FragmentHandle handle) {
-    PlanFragmentSerializable pfs = fragments.get(new FragmentHandleSerializable(handle));
-    if(pfs == null) return null;
-    return pfs.getObject();
-  }
-
-  @Override
-  public void storeFragment(PlanFragment fragment) {
-    fragments.put(new FragmentHandleSerializable(fragment.getHandle()), new PlanFragmentSerializable(fragment));
+  public <K, V> DistributedMultiMap<K, V> getMultiMap(CacheConfig<K, V> config) {
+    Cache<K, DeltaList<V>> cache = manager.getCache(config.getName());
+    return new IMulti<K, V>(cache, config);
   }
 
   @Override
-  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
-    Cache<String, DeltaList<V>> cache = manager.getCache(clazz.getName());
-    return new IMulti<V>(cache, clazz);
-  }
-
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
-    Cache<String, V> c = manager.getCache(name);
-    return new IMap<V>(c);
-  }
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
-    return getNamedMap(clazz.getName(), clazz);
+  public <K, V> DistributedMap<K, V> getMap(CacheConfig<K, V> config) {
+    Cache<K, V> c = manager.getCache(config.getName());
+    return new IMap<K, V>(c, config);
   }
 
   @Override
   public Counter getCounter(String name) {
-    return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
+    if(local){
+        Counter c = localCounters.get(name);
+        if (c == null) {
+          localCounters.putIfAbsent(name, new LocalCounterImpl());
+          return localCounters.get(name);
+        } else {
+          return c;
+        }
+
+    }else{
+      return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
+    }
+
   }
 
   private class JGroupsCounter implements Counter{
@@ -166,63 +187,68 @@ public class ICache implements DistributedCache{
 
   }
 
-  private class IMap<V extends DrillSerializable> implements DistributedMap<V>{
+  private class IMap<K, V> implements DistributedMap<K, V>{
 
-    private Cache<String, V> cache;
+    private Cache<K, V> cache;
+    private CacheConfig<K, V> config;
 
-
-    public IMap(Cache<String, V> cache) {
+    public IMap(Cache<K, V> cache, CacheConfig<K, V> config) {
       super();
       this.cache = cache;
+      this.config = config;
+    }
+
+    @Override
+    public Iterable<Entry<K, V>> getLocalEntries() {
+      return cache.entrySet();
     }
 
     @Override
-    public V get(String key) {
+    public V get(K key) {
       return cache.get(key);
     }
 
     @Override
-    public void put(String key, V value) {
-      cache.put(key,  value);
+    public void delete(K key) {
+      cache.remove(key);
     }
 
     @Override
-    public void putIfAbsent(String key, V value) {
-      cache.putIfAbsent(key,  value);
+    public void put(K key, V value) {
+      cache.put(key,  value);
     }
 
     @Override
-    public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
-      cache.putIfAbsent(key, value, ttl, timeUnit);
+    public void putIfAbsent(K key, V value) {
+      cache.putIfAbsent(key,  value);
     }
 
     @Override
-    public Iterator<Entry<String, V>> iterator() {
-      return cache.entrySet().iterator();
+    public void putIfAbsent(K key, V value, long ttl, TimeUnit timeUnit) {
+      cache.putIfAbsent(key, value, ttl, timeUnit);
     }
 
   }
 
-  private class IMulti<V extends DrillSerializable> implements DistributedMultiMap<V>{
+  private class IMulti<K, V> implements DistributedMultiMap<K, V>{
 
-    private Cache<String, DeltaList<V>> cache;
-    private Class<V> clazz;
+    private Cache<K, DeltaList<V>> cache;
+    private CacheConfig<K, V> config;
 
-    public IMulti(Cache<String, DeltaList<V>> cache, Class<V> clazz) {
+    public IMulti(Cache<K, DeltaList<V>> cache, CacheConfig<K, V> config) {
       super();
       this.cache = cache;
-      this.clazz = clazz;
+      this.config = config;
     }
 
     @Override
-    public Collection<V> get(String key) {
+    public Collection<V> get(K key) {
       return cache.get(key);
     }
 
     @Override
-    public void put(String key, V value) {
+    public void put(K key, V value) {
       cache.put(key, new DeltaList<V>(value));
-//      cache.getAdvancedCache().applyDelta(key, new DeltaList<V>(value), key);
     }
 
   }
@@ -230,7 +256,7 @@ public class ICache implements DistributedCache{
 
 
 
-  private static class DeltaList<V extends DrillSerializable> extends LinkedList<V> implements DeltaAware, Delta{
+  private static class DeltaList<V> extends LinkedList<V> implements DeltaAware, Delta{
 
     /** The serialVersionUID */
     private static final long serialVersionUID = 2176345973026460708L;
@@ -270,18 +296,4 @@ public class ICache implements DistributedCache{
     }
  }
 
-
-//  public void run() {
-//    Config c = new Config();
-//    SerializerConfig sc = new SerializerConfig() //
-//      .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
-//      .setTypeClass(VectorAccessibleSerializable.class);
-//    c.setInstanceName(instanceName);
-//    c.getSerializationConfig().addSerializerConfig(sc);
-//    instance = getInstanceOrCreateNew(c);
-//    workQueueLengths = instance.getTopic("queue-length");
-//    fragments = new HandlePlan(instance);
-//    endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
-//    workQueueLengths.addMessageListener(new Listener());
-//  }
 }