You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:21 UTC

[04/10] git commit: DRILL-271 refactor DistributedCache code. Uses hazel cast 3.1 and custom serialization.

DRILL-271 refactor DistributedCache code. Uses hazel cast 3.1 and custom serialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d529352e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d529352e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d529352e

Branch: refs/heads/master
Commit: d529352e03410e1612db691677dc90c855342c01
Parents: 266d248
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Oct 29 20:24:00 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:38 2013 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   2 +-
 .../exec/cache/HCDrillSerializableWrapper.java  |  63 -------
 .../cache/HCSerializableWrapperClasses.java     |  31 ----
 .../cache/HCVectorAccessibleSerializer.java     |  56 ++++++
 .../org/apache/drill/exec/cache/HazelCache.java |  37 ++--
 .../org/apache/drill/exec/cache/LocalCache.java |  12 +-
 .../apache/drill/exec/cache/ProtoBufWrap.java   |   8 +-
 .../cache/VectorAccessibleSerializable.java     | 184 ++++++++++++++++++
 .../exec/cache/VectorContainerSerializable.java | 186 -------------------
 .../OrderedPartitionRecordBatch.java            |  20 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   6 +-
 .../drill/exec/cache/TestVectorCache.java       |   8 +-
 .../drill/exec/cache/TestWriteToDisk.java       | 108 +++++++++++
 13 files changed, 403 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 063e60e..c5b169d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -185,7 +185,7 @@
     <dependency>
       <groupId>com.hazelcast</groupId>
       <artifactId>hazelcast</artifactId>
-      <version>2.5.1</version>
+      <version>3.1</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.janino</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
deleted file mode 100644
index 3f2c41c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.hazelcast.nio.DataSerializable;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public abstract class HCDrillSerializableWrapper implements DataSerializable {
-
-  private DrillSerializable obj;
-
-  public HCDrillSerializableWrapper() {}
-
-  public HCDrillSerializableWrapper(DrillSerializable obj) {
-    this.obj = obj;
-  }
-
-  public void readData(DataInput in) throws IOException {
-    obj.read(in);
-  }
-
-  public void writeData(DataOutput out) throws IOException {
-    obj.write(out);
-  }
-
-  public DrillSerializable get() {
-    return obj;
-  }
-
-  /**
-   *  This is a method that will get a Class specific implementation of HCDrillSerializableWrapper. Class specific implentations
-   *  are necessary because Hazel Cast requires object that have constructors with no parameters.
-   * @param value
-   * @param clazz
-   * @return
-   */
-  public static HCDrillSerializableWrapper getWrapper(DrillSerializable value, Class clazz) {
-    if (clazz.equals(VectorContainerSerializable.class)) {
-      return new HCSerializableWrapperClasses.HCVectorListSerializable(value);
-    } else {
-      throw new UnsupportedOperationException("HCDrillSerializableWrapper not implemented for " + clazz);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
deleted file mode 100644
index d22723a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-public class HCSerializableWrapperClasses {
-  public static class HCVectorListSerializable extends HCDrillSerializableWrapper {
-
-    public HCVectorListSerializable() {
-      super(new VectorContainerSerializable());
-    }
-
-    public HCVectorListSerializable(DrillSerializable obj) {
-      super(obj);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
new file mode 100644
index 0000000..0d5ba96
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
+
+  private BufferAllocator allocator;
+
+  public HCVectorAccessibleSerializer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
+    VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+    va.readFromStream(DataInputInputStream.constructInputStream(in));
+    return va;
+  }
+
+  public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
+    va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
+  }
+
+  public void destroy() {}
+
+  public int getTypeId() {
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 577dfeb..9dd4373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -22,12 +22,15 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.hazelcast.config.SerializerConfig;
 import com.hazelcast.core.*;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
 import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
@@ -35,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.hazelcast.config.Config;
+import org.apache.drill.exec.server.DrillbitContext;
 
 public class HazelCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -44,9 +48,11 @@ public class HazelCache implements DistributedCache {
   private ITopic<HWorkQueueStatus> workQueueLengths;
   private HandlePlan fragments;
   private Cache<WorkQueueStatus, Integer>  endpoints;
-  
-  public HazelCache(DrillConfig config) {
+  private BufferAllocator allocator;
+
+  public HazelCache(DrillConfig config, BufferAllocator allocator) {
     this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+    this.allocator = allocator;
   }
 
   private class Listener implements MessageListener<HWorkQueueStatus>{
@@ -61,7 +67,10 @@ public class HazelCache implements DistributedCache {
   
   public void run() {
     Config c = new Config();
+    SerializerConfig sc = new SerializerConfig().setImplementation(new HCVectorAccessibleSerializer(allocator))
+            .setTypeClass(VectorAccessibleSerializable.class);
     c.setInstanceName(instanceName);
+    c.getSerializationConfig().addSerializerConfig(sc);
     instance = getInstanceOrCreateNew(c);
     workQueueLengths = instance.getTopic("queue-length");
     fragments = new HandlePlan(instance);
@@ -120,11 +129,11 @@ public class HazelCache implements DistributedCache {
 
   @Override
   public Counter getCounter(String name) {
-    return new HCCounterImpl(this.instance.getAtomicNumber(name));
+    return new HCCounterImpl(this.instance.getAtomicLong(name));
   }
 
   public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
-    private IMap<String, HCDrillSerializableWrapper> m;
+    private IMap<String, DrillSerializable> m;
     private Class<V> clazz;
 
     public HCDistributedMapImpl(IMap m, Class<V> clazz) {
@@ -133,24 +142,24 @@ public class HazelCache implements DistributedCache {
     }
 
     public DrillSerializable get(String key) {
-      return m.get(key).get();
+      return m.get(key);
     }
 
     public void put(String key, DrillSerializable value) {
-      m.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+      m.put(key, value);
     }
 
     public void putIfAbsent(String key, DrillSerializable value) {
-      m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+      m.putIfAbsent(key, value);
     }
 
     public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
-      m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz), ttl, timeunit);
+      m.putIfAbsent(key, value, ttl, timeunit);
     }
   }
 
   public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
-    private com.hazelcast.core.MultiMap<String, HCDrillSerializableWrapper> mmap;
+    private com.hazelcast.core.MultiMap<String, DrillSerializable> mmap;
     private Class<V> clazz;
 
     public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
@@ -160,22 +169,22 @@ public class HazelCache implements DistributedCache {
 
     public Collection<DrillSerializable> get(String key) {
       List<DrillSerializable> list = Lists.newArrayList();
-      for (HCDrillSerializableWrapper v : mmap.get(key)) {
-        list.add(v.get());
+      for (DrillSerializable v : mmap.get(key)) {
+        list.add(v);
       }
       return list;
     }
 
     @Override
     public void put(String key, DrillSerializable value) {
-      mmap.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+      mmap.put(key, value);
     }
   }
 
   public static class HCCounterImpl implements Counter {
-    private AtomicNumber n;
+    private IAtomicLong n;
 
-    public HCCounterImpl(AtomicNumber n) {
+    public HCCounterImpl(IAtomicLong n) {
       this.n = n;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 7ad6ec6..e6275c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.cache;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +32,15 @@ import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.DrillbitContext;
 
 public class LocalCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
@@ -44,7 +49,8 @@ public class LocalCache implements DistributedCache {
   private volatile ConcurrentMap<Class, DistributedMap> maps;
   private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
   private volatile ConcurrentMap<String, Counter> counters;
-  
+  private static final BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+
   @Override
   public void close() throws IOException {
     handles = null;
@@ -116,10 +122,10 @@ public class LocalCache implements DistributedCache {
   public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
     ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
     try {
-      DrillSerializable obj = (DrillSerializable)clazz.newInstance();
+      DrillSerializable obj = (DrillSerializable)clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
       obj.read(in);
       return obj;
-    } catch (InstantiationException | IllegalAccessException | IOException e) {
+    } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
       throw new RuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
index 4aea645..448eecd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -21,11 +21,13 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
-import com.hazelcast.nio.DataSerializable;
+import com.hazelcast.nio.serialization.DataSerializable;
 
 public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
@@ -43,7 +45,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
   }
   
   @Override
-  public void readData(DataInput arg0) throws IOException {
+  public void readData(ObjectDataInput arg0) throws IOException {
     int len = arg0.readShort();
     byte[] b = new byte[len];
     arg0.readFully(b);
@@ -51,7 +53,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
   }
 
   @Override
-  public void writeData(DataOutput arg0) throws IOException {
+  public void writeData(ObjectDataOutput arg0) throws IOException {
     byte[] b = value.toByteArray();
     if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
     arg0.writeShort(b.length);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
new file mode 100644
index 0000000..24387d8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.google.common.collect.Lists;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+
+import java.io.*;
+import java.util.List;
+
+public class VectorAccessibleSerializable implements DrillSerializable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
+  static final MetricRegistry metrics = DrillMetrics.getInstance();
+  static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
+
+  private VectorAccessible va;
+  private BufferAllocator allocator;
+  private int recordCount = -1;
+  private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
+  private SelectionVector2 sv2;
+
+  /**
+   *
+   * @param va
+   */
+  public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
+    this.va = va;
+    this.allocator = allocator;
+  }
+
+  public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
+    this.va = va;
+    this.allocator = allocator;
+    this.sv2 = sv2;
+    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+  }
+
+  public VectorAccessibleSerializable(BufferAllocator allocator) {
+    this.va = new VectorContainer();
+    this.allocator = allocator;
+  }
+
+  @Override
+  public void read(DataInput input) throws IOException {
+    readFromStream(DataInputInputStream.constructInputStream(input));
+  }
+  
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    VectorContainer container = new VectorContainer();
+    UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+    recordCount = batchDef.getRecordCount();
+    if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+      sv2.allocateNew(recordCount * 2);
+      sv2.getBuffer().setBytes(0, input, recordCount * 2);
+      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
+    List<ValueVector> vectorList = Lists.newArrayList();
+    List<FieldMetadata> fieldList = batchDef.getFieldList();
+    for (FieldMetadata metaData : fieldList) {
+      int dataLength = metaData.getBufferLength();
+      byte[] bytes = new byte[dataLength];
+      input.read(bytes);
+      MaterializedField field = MaterializedField.create(metaData.getDef());
+      ByteBuf buf = allocator.buffer(dataLength);
+      buf.setBytes(0, bytes);
+      ValueVector vector = TypeHelper.getNewVector(field, allocator);
+      vector.load(metaData, buf);
+      vectorList.add(vector);
+    }
+    container.addCollection(vectorList);
+    container.buildSchema(svMode);
+    container.setRecordCount(recordCount);
+    va = container;
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    writeToStream(DataOutputOutputStream.constructOutputStream(output));
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
+
+    ByteBuf[] incomingBuffers = batch.getBuffers();
+    UserBitShared.RecordBatchDef batchDef = batch.getDef();
+
+        /* ByteBuf associated with the selection vector */
+    ByteBuf svBuf = null;
+
+        /* Size of the selection vector */
+    int svCount = 0;
+
+    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+    {
+      svCount = sv2.getCount();
+      svBuf = sv2.getBuffer();
+    }
+
+    int totalBufferLength = 0;
+
+    try
+    {
+            /* Write the metadata to the file */
+      batchDef.writeDelimitedTo(output);
+
+            /* If we have a selection vector, dump it to file first */
+      if (svBuf != null)
+      {
+
+                /* For writing to the selection vectors we use
+                 * setChar() method which does not modify the
+                 * reader and writer index. To copy the entire buffer
+                 * without having to get each byte individually we need
+                 * to set the writer index
+                 */
+        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
+
+//        fc.write(svBuf.nioBuffers());
+        svBuf.getBytes(0, output, svBuf.readableBytes());
+        svBuf.release();
+      }
+
+            /* Dump the array of ByteBuf's associated with the value vectors */
+      for (ByteBuf buf : incomingBuffers)
+      {
+                /* dump the buffer into the file channel */
+        int bufLength = buf.readableBytes();
+        buf.getBytes(0, output, bufLength);
+
+                /* compute total length of buffer, will be used when
+                 * we create a compound buffer
+                 */
+        totalBufferLength += buf.readableBytes();
+        buf.release();
+      }
+
+      output.flush();
+      context.stop();
+    } catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    } finally {
+      clear();
+    }
+  }
+
+  private void clear() {
+  }
+
+  public VectorAccessible get() {
+    return va;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
deleted file mode 100644
index 5813dd6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.google.common.collect.Lists;
-import com.yammer.metrics.MetricRegistry;
-import com.yammer.metrics.Timer;
-import io.netty.buffer.ByteBuf;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-
-import java.io.*;
-import java.util.List;
-
-public class VectorContainerSerializable implements DrillSerializable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
-  static final MetricRegistry metrics = DrillMetrics.getInstance();
-  static final String WRITER_TIMER = MetricRegistry.name(VectorContainerSerializable.class, "writerTime");
-
-  private VectorAccessible va;
-  private BootStrapContext context;
-  private int recordCount = -1;
-  private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
-  private SelectionVector2 sv2;
-
-  /**
-   *
-   * @param va
-   */
-  public VectorContainerSerializable(VectorAccessible va){
-    this.va = va;
-    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
-  }
-
-  public VectorContainerSerializable(VectorAccessible va, SelectionVector2 sv2, FragmentContext context) {
-    this.va = va;
-    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
-    this.sv2 = sv2;
-    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-  }
-
-  public VectorContainerSerializable() {
-    this.va = new VectorContainer();
-    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
-  }
-
-  @Override
-  public void read(DataInput input) throws IOException {
-    readFromStream(DataInputInputStream.constructInputStream(input));
-  }
-  
-  @Override
-  public void readFromStream(InputStream input) throws IOException {
-    VectorContainer container = new VectorContainer();
-    UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
-    recordCount = batchDef.getRecordCount();
-    if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
-      sv2.allocateNew(recordCount * 2);
-      sv2.getBuffer().setBytes(0, input, recordCount * 2);
-      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-    }
-    List<ValueVector> vectorList = Lists.newArrayList();
-    List<FieldMetadata> fieldList = batchDef.getFieldList();
-    for (FieldMetadata metaData : fieldList) {
-      int dataLength = metaData.getBufferLength();
-      byte[] bytes = new byte[dataLength];
-      input.read(bytes);
-      MaterializedField field = MaterializedField.create(metaData.getDef());
-      ByteBuf buf = context.getAllocator().buffer(dataLength);
-      buf.setBytes(0, bytes);
-      ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
-      vector.load(metaData, buf);
-      vectorList.add(vector);
-    }
-    container.addCollection(vectorList);
-    container.buildSchema(svMode);
-    container.setRecordCount(recordCount);
-    va = container;
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    writeToStream(DataOutputOutputStream.constructOutputStream(output));
-  }
-
-  @Override
-  public void writeToStream(OutputStream output) throws IOException {
-    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
-
-    ByteBuf[] incomingBuffers = batch.getBuffers();
-    UserBitShared.RecordBatchDef batchDef = batch.getDef();
-
-        /* ByteBuf associated with the selection vector */
-    ByteBuf svBuf = null;
-
-        /* Size of the selection vector */
-    int svCount = 0;
-
-    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
-    {
-      svCount = sv2.getCount();
-      svBuf = sv2.getBuffer();
-    }
-
-    int totalBufferLength = 0;
-
-    try
-    {
-            /* Write the metadata to the file */
-      batchDef.writeDelimitedTo(output);
-
-            /* If we have a selection vector, dump it to file first */
-      if (svBuf != null)
-      {
-
-                /* For writing to the selection vectors we use
-                 * setChar() method which does not modify the
-                 * reader and writer index. To copy the entire buffer
-                 * without having to get each byte individually we need
-                 * to set the writer index
-                 */
-        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-//        fc.write(svBuf.nioBuffers());
-        svBuf.getBytes(0, output, svBuf.readableBytes());
-        svBuf.release();
-      }
-
-            /* Dump the array of ByteBuf's associated with the value vectors */
-      for (ByteBuf buf : incomingBuffers)
-      {
-                /* dump the buffer into the file channel */
-        int bufLength = buf.readableBytes();
-        buf.getBytes(0, output, bufLength);
-
-                /* compute total length of buffer, will be used when
-                 * we create a compound buffer
-                 */
-        totalBufferLength += buf.readableBytes();
-        buf.release();
-      }
-
-      output.flush();
-      context.stop();
-    } catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    } finally {
-      clear();
-    }
-  }
-
-  private void clear() {
-  }
-
-  public VectorAccessible get() {
-    return va;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index ef3886c..e39b82e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -82,7 +82,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   private boolean startedUnsampledBatches = false;
   private boolean upstreamNone = false;
   private int recordCount;
-  private DistributedMap<VectorContainerSerializable> tableMap;
+  private DistributedMap<VectorAccessibleSerializable> tableMap;
   private DistributedMultiMap mmap;
   private String mapKey;
 
@@ -154,13 +154,13 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
       DistributedCache cache = context.getDrillbitContext().getCache();
       mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
-      mmap = cache.getMultiMap(VectorContainerSerializable.class);
+      mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
       List<ValueVector> vectorList = Lists.newArrayList();
       for (VectorWrapper vw : containerToCache) {
         vectorList.add(vw.getValueVector());
       }
 
-      VectorContainerSerializable wrap = new VectorContainerSerializable(containerToCache);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
 
       mmap.put(mapKey, wrap);
       wrap = null;
@@ -169,24 +169,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
       long val = minorFragmentSampleCount.incrementAndGet();
       logger.debug("Incremented mfsc, got {}", val);
-      tableMap = cache.getMap(VectorContainerSerializable.class);
+      tableMap = cache.getMap(VectorAccessibleSerializable.class);
       Preconditions.checkNotNull(tableMap);
 
       if (val == Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
         buildTable();
-        wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+        wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
       } else if (val < Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
         // Wait until sufficient number of fragments have submitted samples, or proceed after 100 ms passed
         for (int i = 0; i < 100 && wrap == null; i++) {
           Thread.sleep(10);
-          wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+          wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
           if (i == 99) {
             buildTable();
-            wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+            wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
           }
         }
       } else {
-        wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+        wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
       }
 
       Preconditions.checkState(wrap != null);
@@ -211,7 +211,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
     SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer);
     for (DrillSerializable w : allSamplesWrap) {
-      containerBuilder.add(((VectorContainerSerializable)w).get());
+      containerBuilder.add(((VectorAccessibleSerializable)w).get());
     }
     containerBuilder.build(context);
 
@@ -239,7 +239,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     }
     candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
 
-    VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
 
     tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 052f415..49732d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -70,7 +70,7 @@ public class Drillbit implements Closeable{
   final DistributedCache cache;
   final WorkManager manager;
   final BootStrapContext context;
-  
+
   private volatile RegistrationHandle handle;
 
   public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
@@ -86,15 +86,15 @@ public class Drillbit implements Closeable{
       this.manager = new WorkManager(context);
       this.coord = new ZKClusterCoordinator(config);
       this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
-      this.cache = new HazelCache(config);
+      this.cache = new HazelCache(config, context.getAllocator());
     }
   }
 
   public void run() throws Exception {
     coord.start(10000);
     DrillbitEndpoint md = engine.start();
-    cache.run();
     manager.start(md, cache, engine.getBitCom(), coord);
+    cache.run();
     handle = coord.register(md);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index ffc0274..94aa3dd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -46,7 +46,7 @@ public class TestVectorCache {
     Drillbit bit = new Drillbit(config, serviceSet);
     bit.run();
     DrillbitContext context = bit.getContext();
-    HazelCache cache = new HazelCache(config);
+    HazelCache cache = new HazelCache(config, context.getAllocator());
     cache.run();
 
     MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
@@ -68,11 +68,11 @@ public class TestVectorCache {
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
     container.setRecordCount(4);
-    VectorContainerSerializable wrap = new VectorContainerSerializable(container);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
 
-    DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
+    DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
     mmap.put("vectors", wrap);
-    VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
+    VectorAccessibleSerializable newWrap = (VectorAccessibleSerializable)mmap.get("vectors").iterator().next();
 
     VectorAccessible newContainer = newWrap.get();
     for (VectorWrapper w : newContainer) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
new file mode 100644
index 0000000..11d15d8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestWriteToDisk {
+
+  @Test
+  public void test() throws Exception {
+    List<ValueVector> vectorList = Lists.newArrayList();
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+    DrillConfig config = DrillConfig.create();
+    Drillbit bit = new Drillbit(config, serviceSet);
+    bit.run();
+    DrillbitContext context = bit.getContext();
+
+    MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
+    IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator());
+    MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY));
+    VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
+    AllocationHelper.allocate(intVector, 4, 4);
+    AllocationHelper.allocate(binVector, 4, 5);
+    vectorList.add(intVector);
+    vectorList.add(binVector);
+
+    intVector.getMutator().set(0, 0); binVector.getMutator().set(0, "ZERO".getBytes());
+    intVector.getMutator().set(1, 1); binVector.getMutator().set(1, "ONE".getBytes());
+    intVector.getMutator().set(2, 2); binVector.getMutator().set(2, "TWO".getBytes());
+    intVector.getMutator().set(3, 3); binVector.getMutator().set(3, "THREE".getBytes());
+    intVector.getMutator().setValueCount(4);
+    binVector.getMutator().setValueCount(4);
+
+    VectorContainer container = new VectorContainer();
+    container.addCollection(vectorList);
+    container.setRecordCount(4);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+
+    Configuration conf = new Configuration();
+    conf.set("fs.name.default", "file:///");
+    FileSystem fs = FileSystem.get(conf);
+    Path path = new Path("/tmp/drillSerializable");
+    if (fs.exists(path)) fs.delete(path, false);
+    FSDataOutputStream out = fs.create(path);
+
+    wrap.writeToStream(out);
+    out.close();
+
+    FSDataInputStream in = fs.open(path);
+    VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(context.getAllocator());
+    newWrap.readFromStream(in);
+    fs.close();
+
+    VectorAccessible newContainer = newWrap.get();
+    for (VectorWrapper w : newContainer) {
+      ValueVector vv = w.getValueVector();
+      int values = vv.getAccessor().getValueCount();
+      for (int i = 0; i < values; i++) {
+        Object o = vv.getAccessor().getObject(i);
+        if (o instanceof byte[]) {
+          System.out.println(new String((byte[])o));
+        } else {
+          System.out.println(o);
+        }
+      }
+    }
+  }
+}