You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2014/05/22 03:14:48 UTC

[11/24] git commit: ispan

ispan


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8621b682
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8621b682
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8621b682

Branch: refs/heads/diagnostics2
Commit: 8621b682cae0f8c8f58ff2e5b750544113bf52ee
Parents: 5472140
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun May 4 16:58:41 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon May 19 09:12:27 2014 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |  10 +
 .../exec/cache/AbstractDataSerializable.java    |  33 ++
 .../exec/cache/AbstractStreamSerializable.java  |  34 ++
 .../drill/exec/cache/CachedVectorContainer.java |  77 +++++
 .../drill/exec/cache/DistributedCache.java      |   4 +-
 .../drill/exec/cache/DrillSerializable.java     |  17 +-
 .../cache/HCVectorAccessibleSerializer.java     |  56 ----
 .../org/apache/drill/exec/cache/HazelCache.java | 252 ---------------
 .../exec/cache/JacksonDrillSerializable.java    |  48 +--
 .../org/apache/drill/exec/cache/LocalCache.java | 305 ------------------
 .../cache/LoopedAbstractDrillSerializable.java  |  80 +++++
 .../apache/drill/exec/cache/ProtoBufImpl.java   |  49 ---
 .../apache/drill/exec/cache/ProtoBufWrap.java   |  67 ----
 .../org/apache/drill/exec/cache/ProtoMap.java   |  52 ----
 .../drill/exec/cache/ProtoSerializable.java     |  65 ++++
 .../cache/VectorAccessibleSerializable.java     |  22 +-
 .../hazel/HCVectorAccessibleSerializer.java     |  58 ++++
 .../drill/exec/cache/hazel/HazelCache.java      | 258 ++++++++++++++++
 .../drill/exec/cache/hazel/ProtoBufImpl.java    |  49 +++
 .../drill/exec/cache/hazel/ProtoBufWrap.java    |  67 ++++
 .../apache/drill/exec/cache/hazel/ProtoMap.java |  52 ++++
 .../drill/exec/cache/infinispan/ICache.java     | 266 ++++++++++++++++
 .../infinispan/VAAdvancedExternalizer.java      |  72 +++++
 .../cache/infinispan/ZookeeperCacheStore.java   |  66 ++++
 .../drill/exec/cache/local/LocalCache.java      | 309 +++++++++++++++++++
 .../OrderedPartitionRecordBatch.java            |  20 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   2 +-
 .../drill/exec/server/RemoteServiceSet.java     |  17 +-
 .../java/org/apache/drill/PlanningBase.java     |   2 +-
 .../java/org/apache/drill/exec/cache/ISpan.java |  94 ++++++
 .../drill/exec/cache/TestVectorCache.java       | 136 +++++---
 .../drill/exec/store/TestOrphanSchema.java      |   2 +-
 .../drill/exec/store/ischema/OrphanSchema.java  |   9 +-
 33 files changed, 1750 insertions(+), 900 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index d693630..6d11614 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -224,6 +224,16 @@
       <version>3.1.4</version>
     </dependency>
     <dependency>
+      <groupId>org.infinispan</groupId>
+      <artifactId>infinispan-core</artifactId>
+      <version>6.0.1.Final</version>
+    </dependency>
+    <dependency>
+      <groupId>org.infinispan</groupId>
+      <artifactId>infinispan-tree</artifactId>
+      <version>6.0.1.Final</version>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.janino</groupId>
       <artifactId>janino</artifactId>
       <version>2.6.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
new file mode 100644
index 0000000..f7b9eed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractDataSerializable.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public abstract class AbstractDataSerializable extends LoopedAbstractDrillSerializable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractDataSerializable.class);
+
+  @Override
+  public abstract void read(DataInput input) throws IOException;
+
+  @Override
+  public abstract void write(DataOutput output) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java
new file mode 100644
index 0000000..ef488d6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/AbstractStreamSerializable.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public abstract class AbstractStreamSerializable extends LoopedAbstractDrillSerializable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractStreamSerializable.class);
+
+  @Override
+  public abstract void readFromStream(InputStream input) throws IOException;
+
+  @Override
+  public abstract void writeToStream(OutputStream output) throws IOException;
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
new file mode 100644
index 0000000..1447e28
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CachedVectorContainer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.WritableBatch;
+
+public class CachedVectorContainer extends LoopedAbstractDrillSerializable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachedVectorContainer.class);
+
+  private final byte[] data;
+  private final BufferAllocator allocator;
+  private VectorContainer container;
+
+  public CachedVectorContainer(WritableBatch batch, BufferAllocator allocator) throws IOException {
+    VectorAccessibleSerializable va = new VectorAccessibleSerializable(batch, allocator);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    va.writeToStream(baos);
+    this.allocator = allocator;
+    this.data = baos.toByteArray();
+    va.clear();
+  }
+
+  public CachedVectorContainer(byte[] data, BufferAllocator allocator) {
+    this.data = data;
+    this.allocator = allocator;
+  }
+
+  private void construct() {
+    try {
+      VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+      va.readFromStream(new ByteArrayInputStream(data));
+      this.container = va.get();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+
+  }
+
+  public VectorAccessible get() {
+    if (container == null) {
+      construct();
+    }
+    return container;
+  }
+
+  public void clear() {
+    container.clear();
+    container = null;
+  }
+
+  public byte[] getData(){
+    return data;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
index 65362e0..aa87162 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DistributedCache.java
@@ -17,14 +17,12 @@
  */
 package org.apache.drill.exec.cache;
 
-import java.io.Closeable;
-
 import org.apache.drill.exec.exception.DrillbitStartupException;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 
 
-public interface DistributedCache extends Closeable{
+public interface DistributedCache extends AutoCloseable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedCache.class);
 
   public void run() throws DrillbitStartupException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
index 4f266f7..21ed37c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -17,19 +17,20 @@
  */
 package org.apache.drill.exec.cache;
 
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
-
-import java.io.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 
 /**
  * Classes that can be put in the Distributed Cache must implement this interface.
  */
-public interface DrillSerializable {
+public interface DrillSerializable extends Externalizable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
-  public void readData(ObjectDataInput input) throws IOException;
+  public void read(DataInput input) throws IOException;
   public void readFromStream(InputStream input) throws IOException;
-  public void writeData(ObjectDataOutput output) throws IOException;
+  public void write(DataOutput output) throws IOException;
   public void writeToStream(OutputStream output) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
deleted file mode 100644
index 0d5ba96..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.StreamSerializer;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.server.DrillbitContext;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
-
-  private BufferAllocator allocator;
-
-  public HCVectorAccessibleSerializer(BufferAllocator allocator) {
-    this.allocator = allocator;
-  }
-
-  public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
-    VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
-    va.readFromStream(DataInputInputStream.constructInputStream(in));
-    return va;
-  }
-
-  public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
-    va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
-  }
-
-  public void destroy() {}
-
-  public int getTypeId() {
-    return 1;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
deleted file mode 100644
index 0149a57..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
-import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.config.SerializerConfig;
-import com.hazelcast.core.DuplicateInstanceNameException;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.ITopic;
-import com.hazelcast.core.Message;
-import com.hazelcast.core.MessageListener;
-import com.hazelcast.nio.serialization.StreamSerializer;
-
-public class HazelCache implements DistributedCache {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
-
-  private final String instanceName;
-  private HazelcastInstance instance;
-  private ITopic<HWorkQueueStatus> workQueueLengths;
-  private HandlePlan fragments;
-  private Cache<WorkQueueStatus, Integer>  endpoints;
-  private BufferAllocator allocator;
-  private DrillConfig config;
-
-  public HazelCache(DrillConfig config, BufferAllocator allocator) {
-    this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
-    this.allocator = allocator;
-    this.config = config;
-  }
-
-  private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
-    SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
-    c.getSerializationConfig().addSerializerConfig(sc);
-  }
-
-  @SuppressWarnings("rawtypes")
-  private <T> void addJSer(Config c, SerializationDefinition d){
-    SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
-    c.getSerializationConfig().addSerializerConfig(sc);
-  }
-
-
-  private class Listener implements MessageListener<HWorkQueueStatus>{
-
-    @Override
-    public void onMessage(Message<HWorkQueueStatus> wrapped) {
-      logger.debug("Received new queue length message.");
-      endpoints.put(wrapped.getMessageObject().get(), 0);
-    }
-
-  }
-
-  public void run() {
-    Config c = new Config();
-    addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
-    addJSer(c, SerializationDefinition.OPTION);
-    addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
-
-    c.setInstanceName(instanceName);
-    c.getGroupConfig().setName(instanceName);
-    for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
-      logger.debug("Adding interface: {}", s);
-      c.getNetworkConfig().getInterfaces().setEnabled(true).addInterface(s);
-    }
-
-    instance = getInstanceOrCreateNew(c);
-    workQueueLengths = instance.getTopic("queue-length");
-    fragments = new HandlePlan(instance);
-    endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
-    workQueueLengths.addMessageListener(new Listener());
-  }
-
-  private HazelcastInstance getInstanceOrCreateNew(Config c) {
-    for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
-      if (instance.getName().equals(this.instanceName))
-        return instance;
-    }
-    try {
-    return Hazelcast.newHazelcastInstance(c);
-    } catch (DuplicateInstanceNameException e) {
-      return getInstanceOrCreateNew(c);
-    }
-  }
-
-//  @Override
-//  public void updateLocalQueueLength(int length) {
-//    workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
-//        .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
-//  }
-//
-//  @Override
-//  public List<WorkQueueStatus> getQueueLengths() {
-//    return Lists.newArrayList(endpoints.asMap().keySet());
-//  }
-
-  @Override
-  public void close() throws IOException {
-    this.instance.getLifecycleService().shutdown();
-  }
-
-  @Override
-  public PlanFragment getFragment(FragmentHandle handle) {
-    return this.fragments.get(handle);
-  }
-
-  @Override
-  public void storeFragment(PlanFragment fragment) {
-    fragments.put(fragment.getHandle(), fragment);
-  }
-
-
-  @Override
-  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
-    com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
-    return new HCDistributedMultiMapImpl<V>(mmap, clazz);
-  }
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
-    return getNamedMap(clazz.getName(), clazz);
-  }
-
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
-    IMap<String, V> imap = this.instance.getMap(name);
-    MapConfig myMapConfig = new MapConfig();
-    myMapConfig.setBackupCount(0);
-    myMapConfig.setReadBackupData(true);
-    instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
-    return new HCDistributedMapImpl<V>(imap);
-  }
-
-  @Override
-  public Counter getCounter(String name) {
-    return new HCCounterImpl(this.instance.getAtomicLong(name));
-  }
-
-
-
-  public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
-    private final IMap<String, V> m;
-
-    public HCDistributedMapImpl(IMap<String, V> m) {
-      this.m = m;
-    }
-
-    public V get(String key) {
-      return m.get(key);
-    }
-
-    public void put(String key, V value) {
-      m.put(key, value);
-    }
-
-    public void putIfAbsent(String key, V value) {
-      m.putIfAbsent(key, value);
-    }
-
-    public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
-      m.putIfAbsent(key, value, ttl, timeunit);
-
-    }
-
-    @Override
-    public Iterator<Entry<String, V>> iterator() {
-      return m.entrySet().iterator();
-    }
-
-
-  }
-
-  public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
-    private com.hazelcast.core.MultiMap<String, V> mmap;
-
-    public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
-      this.mmap = mmap;
-    }
-
-    public Collection<V> get(String key) {
-      List<V> list = Lists.newArrayList();
-      for (V v : mmap.get(key)) {
-        list.add(v);
-      }
-      return list;
-    }
-
-    @Override
-    public void put(String key, V value) {
-      mmap.put(key, value);
-    }
-  }
-
-  public static class HCCounterImpl implements Counter {
-    private IAtomicLong n;
-
-    public HCCounterImpl(IAtomicLong n) {
-      this.n = n;
-    }
-
-    public long get() {
-      return n.get();
-    }
-
-    public long incrementAndGet() {
-      return n.incrementAndGet();
-    }
-
-    public long decrementAndGet() {
-      return n.decrementAndGet();
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
index dcfc1ec..617c356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/JacksonDrillSerializable.java
@@ -17,47 +17,39 @@
  */
 package org.apache.drill.exec.cache;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
 import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.planner.logical.StoragePlugins;
 import org.apache.drill.exec.server.DrillbitContext;
 
-import java.io.*;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
-public abstract class JacksonDrillSerializable<T> implements DrillSerializable, DataSerializable{
+public abstract class JacksonDrillSerializable<T> extends LoopedAbstractDrillSerializable implements DrillSerializable{
   private ObjectMapper mapper;
   private T obj;
+  private Class<T> clazz;
 
-  public JacksonDrillSerializable(DrillbitContext context, T obj) {
+  public JacksonDrillSerializable(DrillbitContext context, T obj, Class<T> clazz) {
+    this(clazz);
     this.mapper = context.getConfig().getMapper();
     this.obj = obj;
   }
 
-  public JacksonDrillSerializable() {
+  public JacksonDrillSerializable(Class<T> clazz) {
+    this.clazz = clazz;
   }
 
   @Override
-  public void readData(ObjectDataInput input) throws IOException {
-    readFromStream(DataInputInputStream.constructInputStream(input));
-  }
-
-  public void readFromStream(InputStream input, Class clazz) throws IOException {
+  public void readFromStream(InputStream input) throws IOException {
     mapper = DrillConfig.create().getMapper();
     obj = (T) mapper.readValue(input, clazz);
   }
 
   @Override
-  public void writeData(ObjectDataOutput output) throws IOException {
-    writeToStream(DataOutputOutputStream.constructOutputStream(output));
-  }
-
-  @Override
   public void writeToStream(OutputStream output) throws IOException {
     output.write(mapper.writeValueAsBytes(obj));
   }
@@ -66,4 +58,20 @@ public abstract class JacksonDrillSerializable<T> implements DrillSerializable,
     return obj;
   }
 
+  public static class StoragePluginsSerializable extends JacksonDrillSerializable<StoragePlugins> {
+
+    public StoragePluginsSerializable(DrillbitContext context, StoragePlugins obj) {
+      super(context, obj, StoragePlugins.class);
+    }
+
+    public StoragePluginsSerializable(BufferAllocator allocator) {
+      super(StoragePlugins.class);
+    }
+
+    public StoragePluginsSerializable() {
+      super(StoragePlugins.class);
+    }
+
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
deleted file mode 100644
index 0fb4b82..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.exception.DrillbitStartupException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.io.ByteArrayDataInput;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
-
-public class LocalCache implements DistributedCache {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
-
-  private volatile Map<FragmentHandle, PlanFragment> handles;
-  private volatile ConcurrentMap<String, DistributedMap<?>> namedMaps;
-  private volatile ConcurrentMap<Class<?>, DistributedMap<?>> maps;
-  private volatile ConcurrentMap<Class<?>, DistributedMultiMap<?>> multiMaps;
-  private volatile ConcurrentMap<String, Counter> counters;
-  private static final BufferAllocator allocator = new TopLevelAllocator();
-
-  private static final ObjectMapper mapper = DrillConfig.create().getMapper();
-
-  @Override
-  public void close() throws IOException {
-    handles = null;
-  }
-
-  @Override
-  public void run() throws DrillbitStartupException {
-    handles = Maps.newConcurrentMap();
-    maps = Maps.newConcurrentMap();
-    multiMaps = Maps.newConcurrentMap();
-    counters = Maps.newConcurrentMap();
-    namedMaps = Maps.newConcurrentMap();
-  }
-
-  @Override
-  public PlanFragment getFragment(FragmentHandle handle) {
-//    logger.debug("looking for fragment with handle: {}", handle);
-    return handles.get(handle);
-  }
-
-  @Override
-  public void storeFragment(PlanFragment fragment) {
-//    logger.debug("Storing fragment: {}", fragment);
-    handles.put(fragment.getHandle(), fragment);
-  }
-
-  @Override
-  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
-    DistributedMultiMap<V> mmap = (DistributedMultiMap<V>) multiMaps.get(clazz);
-    if (mmap == null) {
-      multiMaps.putIfAbsent(clazz, new LocalDistributedMultiMapImpl<V>(clazz));
-      return (DistributedMultiMap<V>) multiMaps.get(clazz);
-    } else {
-      return mmap;
-    }
-  }
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
-    DistributedMap m = maps.get(clazz);
-    if (m == null) {
-      maps.putIfAbsent(clazz, new LocalDistributedMapImpl<V>(clazz));
-      return (DistributedMap<V>) maps.get(clazz);
-    } else {
-      return m;
-    }
-  }
-
-
-  @Override
-  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
-    DistributedMap m = namedMaps.get(clazz);
-    if (m == null) {
-      namedMaps.putIfAbsent(name, new LocalDistributedMapImpl<V>(clazz));
-      return (DistributedMap<V>) namedMaps.get(name);
-    } else {
-      return m;
-    }
-  }
-
-  @Override
-  public Counter getCounter(String name) {
-    Counter c = counters.get(name);
-    if (c == null) {
-      counters.putIfAbsent(name, new LocalCounterImpl());
-      return counters.get(name);
-    } else {
-      return c;
-    }
-  }
-
-  public static ByteArrayDataOutput serialize(DrillSerializable obj) {
-    if(obj instanceof JacksonSerializable){
-      try{
-        ByteArrayDataOutput out = ByteStreams.newDataOutput();
-        out.write(mapper.writeValueAsBytes(obj));
-        return out;
-      }catch(Exception e){
-        throw new RuntimeException(e);
-      }
-    }
-
-    ByteArrayDataOutput out = ByteStreams.newDataOutput();
-    OutputStream outputStream = DataOutputOutputStream.constructOutputStream(out);
-    try {
-      obj.writeToStream(outputStream);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    try {
-      outputStream.flush();
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return out;
-  }
-
-  public static <V extends DrillSerializable> V deserialize(byte[] bytes, Class<V> clazz) {
-    if(JacksonSerializable.class.isAssignableFrom(clazz)){
-      try{
-        return (V) mapper.readValue(bytes, clazz);
-      }catch(Exception e){
-        throw new RuntimeException(e);
-      }
-    }
-
-    ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
-    InputStream inputStream = DataInputInputStream.constructInputStream(in);
-    try {
-      V obj = clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
-      obj.readFromStream(inputStream);
-      return obj;
-    } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static class LocalDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
-    private ArrayListMultimap<String, ByteArrayDataOutput> mmap;
-    private Class<V> clazz;
-
-    public LocalDistributedMultiMapImpl(Class<V> clazz) {
-      mmap = ArrayListMultimap.create();
-      this.clazz = clazz;
-    }
-
-    @Override
-    public Collection<V> get(String key) {
-      List<V> list = Lists.newArrayList();
-      for (ByteArrayDataOutput o : mmap.get(key)) {
-        list.add(deserialize(o.toByteArray(), this.clazz));
-      }
-      return list;
-    }
-
-    @Override
-    public void put(String key, DrillSerializable value) {
-      mmap.put(key, serialize(value));
-    }
-  }
-
-  public static class LocalDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
-    protected ConcurrentMap<String, ByteArrayDataOutput> m;
-    protected Class<V> clazz;
-
-    public LocalDistributedMapImpl(Class<V> clazz) {
-      m = Maps.newConcurrentMap();
-      this.clazz = clazz;
-    }
-
-    @Override
-    public V get(String key) {
-      if (m.get(key) == null) return null;
-      ByteArrayDataOutput b = m.get(key);
-      byte[] bytes = b.toByteArray();
-      return (V) deserialize(m.get(key).toByteArray(), this.clazz);
-    }
-
-    @Override
-    public void put(String key, V value) {
-      m.put(key, serialize(value));
-    }
-
-    @Override
-    public void putIfAbsent(String key, V value) {
-      m.putIfAbsent(key, serialize(value));
-    }
-
-    @Override
-    public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
-      m.putIfAbsent(key, serialize(value));
-      logger.warn("Expiration not implemented in local map cache");
-    }
-
-    private class DeserializingTransformer implements Iterator<Map.Entry<String, V> >{
-      private Iterator<Map.Entry<String, ByteArrayDataOutput>> inner;
-
-      public DeserializingTransformer(Iterator<Entry<String, ByteArrayDataOutput>> inner) {
-        super();
-        this.inner = inner;
-      }
-
-      @Override
-      public boolean hasNext() {
-        return inner.hasNext();
-      }
-
-      @Override
-      public Entry<String, V> next() {
-        return newEntry(inner.next());
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-
-      public Entry<String, V> newEntry(final Entry<String, ByteArrayDataOutput> input) {
-        return new Map.Entry<String, V>(){
-
-          @Override
-          public String getKey() {
-            return input.getKey();
-          }
-
-          @Override
-          public V getValue() {
-            return deserialize(input.getValue().toByteArray(), clazz);
-          }
-
-          @Override
-          public V setValue(V value) {
-            throw new UnsupportedOperationException();
-          }
-
-        };
-      }
-
-    }
-    @Override
-    public Iterator<Entry<String, V>> iterator() {
-      return new DeserializingTransformer(m.entrySet().iterator());
-    }
-  }
-
-  public static class LocalCounterImpl implements Counter {
-    private AtomicLong al = new AtomicLong();
-
-    @Override
-    public long get() {
-      return al.get();
-    }
-
-    @Override
-    public long incrementAndGet() {
-      return al.incrementAndGet();
-    }
-
-    @Override
-    public long decrementAndGet() {
-      return al.decrementAndGet();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
new file mode 100644
index 0000000..1de030d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+
+/**
+ * Helper class that holds the basic functionality to interchangably use the different Drill serializble interfaces.
+ * This is package private as users should utilize either AbstractDataSerializable or AbstractStreamSerializable instead
+ * to avoid infinite loops.
+ */
+abstract class LoopedAbstractDrillSerializable implements DrillSerializable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LoopedAbstractDrillSerializable.class);
+
+  @Override
+  public void writeExternal(ObjectOutput out) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    writeToStream(baos);
+    byte[] ba = baos.toByteArray();
+    out.write(ba.length);
+    out.write(ba);
+  }
+
+
+  @Override
+  public void read(DataInput input) throws IOException {
+    readFromStream(DataInputInputStream.constructInputStream(input));
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    writeToStream(DataOutputOutputStream.constructOutputStream(output));
+  }
+
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    read(new DataInputStream(input));
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    write(new DataOutputStream(output));
+  }
+
+  @Override
+  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+    int len = in.readInt();
+    byte[] bytes = new byte[len];
+    in.readFully(bytes);
+    readFromStream(new ByteArrayInputStream(bytes));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
deleted file mode 100644
index 53b2bfa..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import org.apache.drill.exec.proto.BitControl.PlanFragment;
-import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-
-import com.hazelcast.core.HazelcastInstance;
-
-public class ProtoBufImpl {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
-  
-  public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
-    public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
-    public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
-  }
-  
-  public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
-    public HFragmentHandle() {super(FragmentHandle.PARSER);}
-    public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
-  }
-  
-  public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
-    public HPlanFragment() {super(PlanFragment.PARSER);}
-    public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
-  }
-  
-  public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
-    public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
-    public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
-    public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
deleted file mode 100644
index d55c133..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.io.IOException;
-
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
-import com.hazelcast.nio.serialization.DataSerializable;
-
-public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
-  
-  T value;
-  final Parser<T> parser;
-  
-  public ProtoBufWrap(Parser<T> parser){
-    this(null, parser);
-  }
-  
-  public ProtoBufWrap(T value, Parser<T> parser){
-    this.value = value;
-    this.parser = parser;
-  }
-  
-  @Override
-  public void readData(ObjectDataInput arg0) throws IOException {
-    int len = arg0.readShort();
-    byte[] b = new byte[len];
-    arg0.readFully(b);
-    this.value = parser.parseFrom(b);
-  }
-
-  @Override
-  public void writeData(ObjectDataOutput arg0) throws IOException {
-    byte[] b = value.toByteArray();
-    if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
-    arg0.writeShort(b.length);
-    arg0.write(b);
-  }
-
-  protected T get() {
-    return value;
-  }
-
-  protected void set(T value) {
-    this.value = value;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
deleted file mode 100644
index 1de1c4e..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoMap.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IMap;
-
-public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
-
-  private IMap<HK, HV> hzMap;
-  
-  public ProtoMap(HazelcastInstance instance, String mapName){
-    hzMap = instance.getMap(mapName);
-  }
-  
-  public V get(K key){
-    Preconditions.checkNotNull(key);
-    HK hk = getNewKey(key);
-    HV hv = hzMap.get(hk);
-    if(hv == null) return null;
-    return hv.get();
-  }
-  
-  public V put(K key, V value){
-    Preconditions.checkNotNull(key);
-    Preconditions.checkNotNull(value);
-    HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
-    return oldValue == null ? null : oldValue.get();
-  }
-  
-  public abstract HK getNewKey(K key);
-  public abstract HV getNewValue(V key);
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
new file mode 100644
index 0000000..1538a85
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.google.protobuf.Message;
+import com.google.protobuf.Parser;
+
+public abstract class ProtoSerializable<V extends Message> extends AbstractStreamSerializable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoSerializable.class);
+
+  private Parser<V> protoParser;
+  private V obj;
+
+  ProtoSerializable(Parser<V> protoParser, V obj) {
+    super();
+    this.protoParser = protoParser;
+    this.obj = obj;
+  }
+
+  public V getObject(){
+    return obj;
+  }
+
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    obj = protoParser.parseFrom(input);
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    obj.writeTo(output);
+  }
+
+  public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{
+    public PlanFragmentSerializable(PlanFragment obj) {super(PlanFragment.PARSER, obj);}
+    public PlanFragmentSerializable(){this(null);}
+  }
+  public static class FragmentHandleSerializable extends ProtoSerializable<FragmentHandle>{
+    public FragmentHandleSerializable(FragmentHandle obj) {super(FragmentHandle.PARSER, obj);}
+    public FragmentHandleSerializable(){this(null);}
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 073a8d5..bb3f527 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -19,18 +19,18 @@ package org.apache.drill.exec.cache;
 
 import io.netty.buffer.ByteBuf;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.VectorAccessible;
@@ -43,19 +43,17 @@ import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import com.hazelcast.nio.ObjectDataInput;
-import com.hazelcast.nio.ObjectDataOutput;
 
 /**
  * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
  * from an InputStream and construct a new VectorContainer.
  */
-public class VectorAccessibleSerializable implements DrillSerializable {
+public class VectorAccessibleSerializable extends AbstractStreamSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
-  private VectorAccessible va;
+  private VectorContainer va;
   private WritableBatch batch;
   private BufferAllocator allocator;
   private int recordCount = -1;
@@ -91,10 +89,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     }
   }
 
-  @Override
-  public void readData(ObjectDataInput input) throws IOException {
-    readFromStream(DataInputInputStream.constructInputStream(input));
-  }
 
   /**
    * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
@@ -134,10 +128,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     va = container;
   }
 
-  @Override
-  public void writeData(ObjectDataOutput output) throws IOException {
-    writeToStream(DataOutputOutputStream.constructOutputStream(output));
-  }
 
   public void writeToStreamAndRetain(OutputStream output) throws IOException {
     retain = true;
@@ -208,7 +198,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     }
   }
 
-  public VectorAccessible get() {
+  public VectorContainer get() {
     return va;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
new file mode 100644
index 0000000..bac2323
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HCVectorAccessibleSerializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
+
+  private BufferAllocator allocator;
+
+  public HCVectorAccessibleSerializer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
+    VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+    va.readFromStream(DataInputInputStream.constructInputStream(in));
+    return va;
+  }
+
+  public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
+    va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
+  }
+
+  public void destroy() {}
+
+  public int getTypeId() {
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
new file mode 100644
index 0000000..06518b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/HazelCache.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.Counter;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.DrillSerializable;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HWorkQueueStatus;
+import org.apache.drill.exec.cache.hazel.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.hazelcast.config.Config;
+import com.hazelcast.config.MapConfig;
+import com.hazelcast.config.SerializerConfig;
+import com.hazelcast.core.DuplicateInstanceNameException;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IAtomicLong;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+import com.hazelcast.nio.serialization.StreamSerializer;
+
+public class HazelCache implements DistributedCache {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
+
+  private final String instanceName;
+  private HazelcastInstance instance;
+  private ITopic<HWorkQueueStatus> workQueueLengths;
+  private HandlePlan fragments;
+  private Cache<WorkQueueStatus, Integer>  endpoints;
+  private BufferAllocator allocator;
+  private DrillConfig config;
+
+  public HazelCache(DrillConfig config, BufferAllocator allocator) {
+    this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+    this.allocator = allocator;
+    this.config = config;
+  }
+
+  private <T> void addSer(Config c, StreamSerializer<T> serializer, Class<T> clazz){
+    SerializerConfig sc = new SerializerConfig().setImplementation(serializer).setTypeClass(clazz);
+    c.getSerializationConfig().addSerializerConfig(sc);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private <T> void addJSer(Config c, SerializationDefinition d){
+    SerializerConfig sc = new SerializerConfig().setImplementation(new JacksonAdvancedSerializer(d, config.getMapper())).setTypeClass(d.clazz);
+    c.getSerializationConfig().addSerializerConfig(sc);
+  }
+
+
+  private class Listener implements MessageListener<HWorkQueueStatus>{
+
+    @Override
+    public void onMessage(Message<HWorkQueueStatus> wrapped) {
+      logger.debug("Received new queue length message.");
+      endpoints.put(wrapped.getMessageObject().get(), 0);
+    }
+
+  }
+
+  public void run() {
+    Config c = new Config();
+    addSer(c, new HCVectorAccessibleSerializer(allocator), VectorAccessibleSerializable.class);
+    addJSer(c, SerializationDefinition.OPTION);
+    addJSer(c, SerializationDefinition.STORAGE_PLUGINS);
+
+    c.setInstanceName(instanceName);
+    c.getGroupConfig().setName(instanceName);
+    for (String s : DrillConfig.create().getStringList(ExecConstants.HAZELCAST_SUBNETS)) {
+      logger.debug("Adding interface: {}", s);
+      c.getNetworkConfig().getInterfaces().setEnabled(true).addInterface(s);
+    }
+
+    instance = getInstanceOrCreateNew(c);
+    workQueueLengths = instance.getTopic("queue-length");
+    fragments = new HandlePlan(instance);
+    endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
+    workQueueLengths.addMessageListener(new Listener());
+  }
+
+  private HazelcastInstance getInstanceOrCreateNew(Config c) {
+    for (HazelcastInstance instance : Hazelcast.getAllHazelcastInstances()){
+      if (instance.getName().equals(this.instanceName))
+        return instance;
+    }
+    try {
+    return Hazelcast.newHazelcastInstance(c);
+    } catch (DuplicateInstanceNameException e) {
+      return getInstanceOrCreateNew(c);
+    }
+  }
+
+//  @Override
+//  public void updateLocalQueueLength(int length) {
+//    workQueueLengths.publish(new HWorkQueueStatus(WorkQueueStatus.newBuilder().setEndpoint(endpoint)
+//        .setQueueLength(length).setReportTime(System.currentTimeMillis()).build()));
+//  }
+//
+//  @Override
+//  public List<WorkQueueStatus> getQueueLengths() {
+//    return Lists.newArrayList(endpoints.asMap().keySet());
+//  }
+
+  @Override
+  public void close() throws IOException {
+    this.instance.getLifecycleService().shutdown();
+  }
+
+  @Override
+  public PlanFragment getFragment(FragmentHandle handle) {
+    return this.fragments.get(handle);
+  }
+
+  @Override
+  public void storeFragment(PlanFragment fragment) {
+    fragments.put(fragment.getHandle(), fragment);
+  }
+
+
+  @Override
+  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
+    com.hazelcast.core.MultiMap<String, V> mmap = this.instance.getMultiMap(clazz.toString());
+    return new HCDistributedMultiMapImpl<V>(mmap, clazz);
+  }
+
+  @Override
+  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
+    return getNamedMap(clazz.getName(), clazz);
+  }
+
+
+  @Override
+  public <V extends DrillSerializable> DistributedMap<V> getNamedMap(String name, Class<V> clazz) {
+    IMap<String, V> imap = this.instance.getMap(name);
+    MapConfig myMapConfig = new MapConfig();
+    myMapConfig.setBackupCount(0);
+    myMapConfig.setReadBackupData(true);
+    instance.getConfig().getMapConfigs().put(clazz.toString(), myMapConfig);
+    return new HCDistributedMapImpl<V>(imap);
+  }
+
+  @Override
+  public Counter getCounter(String name) {
+    return new HCCounterImpl(this.instance.getAtomicLong(name));
+  }
+
+
+
+  public static class HCDistributedMapImpl<V extends DrillSerializable> implements DistributedMap<V> {
+    private final IMap<String, V> m;
+
+    public HCDistributedMapImpl(IMap<String, V> m) {
+      this.m = m;
+    }
+
+    public V get(String key) {
+      return m.get(key);
+    }
+
+    public void put(String key, V value) {
+      m.put(key, value);
+    }
+
+    public void putIfAbsent(String key, V value) {
+      m.putIfAbsent(key, value);
+    }
+
+    public void putIfAbsent(String key, V value, long ttl, TimeUnit timeunit) {
+      m.putIfAbsent(key, value, ttl, timeunit);
+
+    }
+
+    @Override
+    public Iterator<Entry<String, V>> iterator() {
+      return m.entrySet().iterator();
+    }
+
+
+  }
+
+  public static class HCDistributedMultiMapImpl<V extends DrillSerializable> implements DistributedMultiMap<V> {
+    private com.hazelcast.core.MultiMap<String, V> mmap;
+
+    public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap<String, V> mmap, Class<V> clazz) {
+      this.mmap = mmap;
+    }
+
+    public Collection<V> get(String key) {
+      List<V> list = Lists.newArrayList();
+      for (V v : mmap.get(key)) {
+        list.add(v);
+      }
+      return list;
+    }
+
+    @Override
+    public void put(String key, V value) {
+      mmap.put(key, value);
+    }
+  }
+
+  public static class HCCounterImpl implements Counter {
+    private IAtomicLong n;
+
+    public HCCounterImpl(IAtomicLong n) {
+      this.n = n;
+    }
+
+    public long get() {
+      return n.get();
+    }
+
+    public long incrementAndGet() {
+      return n.incrementAndGet();
+    }
+
+    public long decrementAndGet() {
+      return n.decrementAndGet();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
new file mode 100644
index 0000000..d992aa7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.BitControl.WorkQueueStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
+import com.hazelcast.core.HazelcastInstance;
+
+public class ProtoBufImpl {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufImpl.class);
+  
+  public static class HWorkQueueStatus extends ProtoBufWrap<WorkQueueStatus>{
+    public HWorkQueueStatus() {super(WorkQueueStatus.PARSER);}
+    public HWorkQueueStatus(WorkQueueStatus value) {super(value, WorkQueueStatus.PARSER);}
+  }
+  
+  public static class HFragmentHandle extends ProtoBufWrap<FragmentHandle>{
+    public HFragmentHandle() {super(FragmentHandle.PARSER);}
+    public HFragmentHandle(FragmentHandle value) {super(value, FragmentHandle.PARSER);}
+  }
+  
+  public static class HPlanFragment extends ProtoBufWrap<PlanFragment>{
+    public HPlanFragment() {super(PlanFragment.PARSER);}
+    public HPlanFragment(PlanFragment value) {super(value, PlanFragment.PARSER);}
+  }
+  
+  public static class HandlePlan extends ProtoMap<FragmentHandle, PlanFragment, HFragmentHandle, HPlanFragment>{
+    public HandlePlan(HazelcastInstance instance) {super(instance, "plan-fragment-cache");}
+    public HFragmentHandle getNewKey(FragmentHandle key) {return new HFragmentHandle(key);}
+    public HPlanFragment getNewValue(PlanFragment value) {return new HPlanFragment(value);}
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
new file mode 100644
index 0000000..23a4e08
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoBufWrap.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import java.io.IOException;
+
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.DataSerializable;
+
+public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
+  
+  T value;
+  final Parser<T> parser;
+  
+  public ProtoBufWrap(Parser<T> parser){
+    this(null, parser);
+  }
+  
+  public ProtoBufWrap(T value, Parser<T> parser){
+    this.value = value;
+    this.parser = parser;
+  }
+  
+  @Override
+  public void readData(ObjectDataInput arg0) throws IOException {
+    int len = arg0.readShort();
+    byte[] b = new byte[len];
+    arg0.readFully(b);
+    this.value = parser.parseFrom(b);
+  }
+
+  @Override
+  public void writeData(ObjectDataOutput arg0) throws IOException {
+    byte[] b = value.toByteArray();
+    if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
+    arg0.writeShort(b.length);
+    arg0.write(b);
+  }
+
+  protected T get() {
+    return value;
+  }
+
+  protected void set(T value) {
+    this.value = value;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
new file mode 100644
index 0000000..72d793a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/hazel/ProtoMap.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.hazel;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+
+public abstract class ProtoMap<K extends MessageLite, V extends MessageLite, HK extends ProtoBufWrap<K>, HV extends ProtoBufWrap<V>> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoMap.class);
+
+  private IMap<HK, HV> hzMap;
+  
+  public ProtoMap(HazelcastInstance instance, String mapName){
+    hzMap = instance.getMap(mapName);
+  }
+  
+  public V get(K key){
+    Preconditions.checkNotNull(key);
+    HK hk = getNewKey(key);
+    HV hv = hzMap.get(hk);
+    if(hv == null) return null;
+    return hv.get();
+  }
+  
+  public V put(K key, V value){
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+    HV oldValue = hzMap.put(getNewKey(key), getNewValue(value));
+    return oldValue == null ? null : oldValue.get();
+  }
+  
+  public abstract HK getNewKey(K key);
+  public abstract HV getNewValue(V key);
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
new file mode 100644
index 0000000..b26be7d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.Counter;
+import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.DistributedMap;
+import org.apache.drill.exec.cache.DistributedMultiMap;
+import org.apache.drill.exec.cache.DrillSerializable;
+import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
+import org.apache.drill.exec.cache.ProtoSerializable.PlanFragmentSerializable;
+import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.infinispan.Cache;
+import org.infinispan.atomic.Delta;
+import org.infinispan.atomic.DeltaAware;
+import org.infinispan.configuration.cache.CacheMode;
+import org.infinispan.configuration.cache.Configuration;
+import org.infinispan.configuration.cache.ConfigurationBuilder;
+import org.infinispan.configuration.global.GlobalConfiguration;
+import org.infinispan.configuration.global.GlobalConfigurationBuilder;
+import org.infinispan.manager.DefaultCacheManager;
+import org.infinispan.manager.EmbeddedCacheManager;
+import org.infinispan.remoting.transport.jgroups.JGroupsTransport;
+import org.jgroups.blocks.atomic.CounterService;
+import org.jgroups.fork.ForkChannel;
+import org.jgroups.protocols.COUNTER;
+import org.jgroups.protocols.FRAG2;
+import org.jgroups.stack.ProtocolStack;
+
+public class ICache implements DistributedCache{
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ICache.class);
+
+  private EmbeddedCacheManager manager;
+  private ForkChannel cacheChannel;
+  private final CounterService counters;
+  private final Cache<FragmentHandleSerializable, PlanFragmentSerializable> fragments;
+
+  public ICache(DrillConfig config, BufferAllocator allocator) throws Exception {
+    String clusterName = config.getString(ExecConstants.SERVICE_NAME);
+    GlobalConfigurationBuilder gcb = new GlobalConfigurationBuilder();
+    gcb.transport() //
+    .defaultTransport().clusterName(clusterName).build();
+    gcb.serialization() //
+    .addAdvancedExternalizer(new VAAdvancedExternalizer(allocator));
+
+    GlobalConfiguration gc = gcb.build();
+    Configuration c = new ConfigurationBuilder() //
+      .clustering() //
+      .cacheMode(CacheMode.DIST_ASYNC) //
+      .storeAsBinary() //
+      .build();
+    this.manager = new DefaultCacheManager(gc, c);
+    JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
+    this.cacheChannel = new ForkChannel(transport.getChannel(), "drill-stack", "drill-hijacker", true, ProtocolStack.ABOVE, FRAG2.class, new COUNTER());
+    this.fragments = manager.getCache(PlanFragment.class.getName());
+    this.counters = new CounterService(this.cacheChannel);
+  }
+
+  @Override
+  public void close() throws IOException {
+    manager.stop();
+  }
+
+  @Override
+  public void run() throws DrillbitStartupException {
+    try {
+      cacheChannel.connect("c1");
+    } catch (Exception e) {
+      throw new DrillbitStartupException("Failure while trying to set up JGroups.");
+    }
+  }
+
+  @Override
+  public PlanFragment getFragment(FragmentHandle handle) {
+    PlanFragmentSerializable pfs = fragments.get(new FragmentHandleSerializable(handle));
+    if(pfs == null) return null;
+    return pfs.getObject();
+  }
+
+  @Override
+  public void storeFragment(PlanFragment fragment) {
+    fragments.put(new FragmentHandleSerializable(fragment.getHandle()), new PlanFragmentSerializable(fragment));
+  }
+
+  @Override
+  public <V extends DrillSerializable> DistributedMultiMap<V> getMultiMap(Class<V> clazz) {
+    Cache<String, DeltaList<V>> cache = manager.getCache(clazz.getName());
+    return new IMulti<V>(cache, clazz);
+  }
+
+  @Override
+  public <V extends DrillSerializable> DistributedMap<V> getMap(Class<V> clazz) {
+    Cache<String, V> c = manager.getCache(clazz.getName());
+    return new IMap<V>(c);
+  }
+
+  @Override
+  public Counter getCounter(String name) {
+    return new JGroupsCounter(counters.getOrCreateCounter(name, 0));
+  }
+
+  private class JGroupsCounter implements Counter{
+    final org.jgroups.blocks.atomic.Counter inner;
+
+    public JGroupsCounter(org.jgroups.blocks.atomic.Counter inner) {
+      super();
+      this.inner = inner;
+    }
+
+    @Override
+    public long get() {
+      return inner.get();
+    }
+
+    @Override
+    public long incrementAndGet() {
+      return inner.incrementAndGet();
+    }
+
+    @Override
+    public long decrementAndGet() {
+      return inner.decrementAndGet();
+    }
+
+  }
+
+  private class IMap<V extends DrillSerializable> implements DistributedMap<V>{
+
+    private Cache<String, V> cache;
+
+
+    public IMap(Cache<String, V> cache) {
+      super();
+      this.cache = cache;
+    }
+
+    @Override
+    public V get(String key) {
+      return cache.get(key);
+    }
+
+    @Override
+    public void put(String key, V value) {
+      cache.put(key,  value);
+    }
+
+    @Override
+    public void putIfAbsent(String key, V value) {
+      cache.putIfAbsent(key,  value);
+    }
+
+    @Override
+    public void putIfAbsent(String key, V value, long ttl, TimeUnit timeUnit) {
+      cache.putIfAbsent(key, value, ttl, timeUnit);
+    }
+
+  }
+
+  private class IMulti<V extends DrillSerializable> implements DistributedMultiMap<V>{
+
+    private Cache<String, DeltaList<V>> cache;
+    private Class<V> clazz;
+
+    public IMulti(Cache<String, DeltaList<V>> cache, Class<V> clazz) {
+      super();
+      this.cache = cache;
+      this.clazz = clazz;
+    }
+
+    @Override
+    public Collection<V> get(String key) {
+      return cache.get(key);
+    }
+
+    @Override
+    public void put(String key, V value) {
+      cache.put(key, new DeltaList<V>(value));
+//      cache.getAdvancedCache().applyDelta(key, new DeltaList<V>(value), key);
+    }
+
+  }
+
+
+
+
+  private static class DeltaList<V extends DrillSerializable> extends LinkedList<V> implements DeltaAware, Delta{
+
+    /** The serialVersionUID */
+    private static final long serialVersionUID = 2176345973026460708L;
+
+    public DeltaList(Collection<? extends V> c) {
+       super(c);
+    }
+
+    public DeltaList(V obj) {
+       super();
+       add(obj);
+    }
+
+    @Override
+    public Delta delta() {
+       return new DeltaList<V>(this);
+    }
+
+    @Override
+    public void commit() {
+       this.clear();
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public DeltaAware merge(DeltaAware d) {
+       List<V> other = null;
+       if (d != null && d instanceof DeltaList) {
+          other = (List<V>) d;
+          for (V e : this) {
+             other.add(e);
+          }
+          return (DeltaAware) other;
+       } else {
+          return this;
+       }
+    }
+ }
+
+
+//  public void run() {
+//    Config c = new Config();
+//    SerializerConfig sc = new SerializerConfig() //
+//      .setImplementation(new HCVectorAccessibleSerializer(allocator)) //
+//      .setTypeClass(VectorAccessibleSerializable.class);
+//    c.setInstanceName(instanceName);
+//    c.getSerializationConfig().addSerializerConfig(sc);
+//    instance = getInstanceOrCreateNew(c);
+//    workQueueLengths = instance.getTopic("queue-length");
+//    fragments = new HandlePlan(instance);
+//    endpoints = CacheBuilder.newBuilder().maximumSize(2000).build();
+//    workQueueLengths.addMessageListener(new Listener());
+//  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8621b682/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
new file mode 100644
index 0000000..5f54f74
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/VAAdvancedExternalizer.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache.infinispan;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Set;
+
+import org.apache.drill.exec.cache.CachedVectorContainer;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.infinispan.commons.marshall.AdvancedExternalizer;
+
+import com.google.common.collect.ImmutableSet;
+
+public class VAAdvancedExternalizer implements AdvancedExternalizer<CachedVectorContainer> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VAAdvancedExternalizer.class);
+
+  private BufferAllocator allocator;
+
+
+  public VAAdvancedExternalizer(BufferAllocator allocator) {
+    super();
+    this.allocator = allocator;
+  }
+
+  static final Set<Class<? extends CachedVectorContainer>> CLASSES = //
+      (Set<Class<? extends CachedVectorContainer>>) //
+      (Object) ImmutableSet.of(CachedVectorContainer.class);
+
+  @Override
+  public CachedVectorContainer readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+    int length = in.readInt();
+    byte[] b = new byte[length];
+    in.read(b);
+    CachedVectorContainer va = new CachedVectorContainer(b, allocator);
+    return va;
+  }
+
+  @Override
+  public void writeObject(ObjectOutput out, CachedVectorContainer va) throws IOException {
+    out.writeInt(va.getData().length);
+    out.write(va.getData());
+  }
+
+  @Override
+  public Integer getId() {
+    // magic number for this class, assume drill uses 3001-3100.
+    return 3001;
+  }
+
+  @Override
+  public Set<Class<? extends CachedVectorContainer>> getTypeClasses() {
+    return CLASSES;
+  }
+}