You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/05/24 18:14:08 UTC

[19/21] git commit: Serialization/Deserialization fixes

Serialization/Deserialization fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/710eeeec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/710eeeec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/710eeeec

Branch: refs/heads/master
Commit: 710eeeec24c7f820fb9298d259d8c1a09253e11d
Parents: 24e3af2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 21 18:14:26 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri May 23 10:02:53 2014 -0700

----------------------------------------------------------------------
 .../cache/LoopedAbstractDrillSerializable.java  |   2 +-
 .../drill/exec/cache/ProtoSerializable.java     |  29 ++-
 .../drill/exec/cache/infinispan/ICache.java     |   3 +-
 .../infinispan/JacksonAdvancedExternalizer.java |   8 +-
 .../ProtobufAdvancedExternalizer.java           |   2 +-
 .../apache/drill/exec/client/DrillClient.java   |  13 +-
 .../drill/exec/server/RemoteServiceSet.java     |   8 +-
 .../java/org/apache/drill/BaseTestQuery.java    |  14 +-
 .../exec/cache/TestCacheSerialization.java      | 193 +++++++++++++++++++
 .../drill/exec/cache/TestVectorCache.java       | 129 -------------
 10 files changed, 260 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
index 1de030d..d2a7458 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
@@ -45,7 +45,7 @@ abstract class LoopedAbstractDrillSerializable implements DrillSerializable {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     writeToStream(baos);
     byte[] ba = baos.toByteArray();
-    out.write(ba.length);
+    out.writeInt(ba.length);
     out.write(ba);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
index 1538a85..f48aae1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
@@ -45,12 +45,37 @@ public abstract class ProtoSerializable<V extends Message> extends AbstractStrea
 
   @Override
   public void readFromStream(InputStream input) throws IOException {
-    obj = protoParser.parseFrom(input);
+    obj = protoParser.parseDelimitedFrom(input);
   }
 
   @Override
   public void writeToStream(OutputStream output) throws IOException {
-    obj.writeTo(output);
+    obj.writeDelimitedTo(output);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((obj == null) ? 0 : obj.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ProtoSerializable other = (ProtoSerializable) obj;
+    if (this.obj == null) {
+      if (other.obj != null)
+        return false;
+    } else if (!this.obj.equals(other.obj))
+      return false;
+    return true;
   }
 
   public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index 92ce08d..f56f19a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -81,8 +81,9 @@ public class ICache implements DistributedCache{
 
     Configuration c = new ConfigurationBuilder() //
       .clustering() //
+
       .cacheMode(CacheMode.DIST_ASYNC) //
-      .storeAsBinary() //
+      .storeAsBinary().enable() //
       .build();
     this.manager = new DefaultCacheManager(gc, c);
     JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
index 81f4877..63a6d62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
@@ -46,12 +46,16 @@ public class JacksonAdvancedExternalizer<T> implements AdvancedExternalizer<T>
 
   @Override
   public T readObject(ObjectInput in) throws IOException, ClassNotFoundException {
-    return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
+    byte[] bytes = new byte[in.readInt()];
+    in.readFully(bytes);
+    return (T) mapper.readValue(bytes, clazz);
   }
 
   @Override
   public void writeObject(ObjectOutput out, T object) throws IOException {
-    out.write(mapper.writeValueAsBytes(object));
+    byte[] bytes = mapper.writeValueAsBytes(object);
+    out.writeInt(bytes.length);
+    out.write(bytes);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
index 821443a..df97a01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
@@ -46,7 +46,7 @@ public class ProtobufAdvancedExternalizer<T extends Message> implements Advanced
 
   @Override
   public T readObject(ObjectInput in) throws IOException, ClassNotFoundException {
-    return parser.parseDelimitedFrom(DataInputInputStream.constructInputStream(in));
+    return parser.parseFrom(DataInputInputStream.constructInputStream(in));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 92097e7..4755d32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -34,6 +34,7 @@ import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -68,10 +69,11 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   private UserProperties props = null;
   private volatile ClusterCoordinator clusterCoordinator;
   private volatile boolean connected = false;
-  private final TopLevelAllocator allocator = new TopLevelAllocator(Long.MAX_VALUE);
+  private final BufferAllocator allocator;
   private int reconnectTimes;
   private int reconnectDelay;
   private final boolean ownsZkConnection;
+  private final boolean ownsAllocator;
 
   public DrillClient() {
     this(DrillConfig.create());
@@ -86,7 +88,13 @@ public class DrillClient implements Closeable, ConnectionThrottle{
   }
 
   public DrillClient(DrillConfig config, ClusterCoordinator coordinator){
+    this(config, coordinator, null);
+  }
+
+  public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator){
     this.ownsZkConnection = coordinator == null;
+    this.ownsAllocator = allocator == null;
+    this.allocator = allocator == null ? new TopLevelAllocator(Long.MAX_VALUE) : allocator;
     this.config = config;
     this.clusterCoordinator = coordinator;
     this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
@@ -180,7 +188,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
     }
   }
 
-  public TopLevelAllocator getAllocator() {
+  public BufferAllocator getAllocator() {
     return allocator;
   }
 
@@ -189,6 +197,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
    */
   public void close(){
     if(this.client != null) this.client.close();
+    if(this.ownsAllocator && allocator != null) allocator.close();
     if(ownsZkConnection){
       try {
         this.clusterCoordinator.close();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 2078107..d64efa4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -20,11 +20,13 @@ package org.apache.drill.exec.server;
 import java.io.Closeable;
 import java.io.IOException;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.infinispan.ICache;
 import org.apache.drill.exec.cache.local.LocalCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.LocalClusterCoordinator;
-import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 
 public class RemoteServiceSet implements Closeable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -63,4 +65,8 @@ public class RemoteServiceSet implements Closeable{
     return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator());
   }
 
+  public static RemoteServiceSet getServiceSetWithFullCache(DrillConfig config, BufferAllocator allocator) throws Exception{
+    ICache c = new ICache(config, allocator);
+    return new RemoteServiceSet(c, new LocalClusterCoordinator());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index fcd2a3b..e7bc87d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.client.PrintingResultsListener;
 import org.apache.drill.exec.client.QuerySubmitter;
 import org.apache.drill.exec.client.QuerySubmitter.Format;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
 import org.apache.drill.exec.rpc.RpcException;
@@ -52,6 +53,8 @@ import com.google.common.io.Resources;
 public class BaseTestQuery extends ExecTest{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
+  private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
+
   public final TestRule resetWatcher = new TestWatcher() {
     @Override
     protected void failed(Throwable e, Description description) {
@@ -68,6 +71,7 @@ public class BaseTestQuery extends ExecTest{
   protected static RemoteServiceSet serviceSet;
   protected static DrillConfig config;
   protected static QuerySubmitter submitter = new QuerySubmitter();
+  protected static BufferAllocator allocator;
 
   static void resetClientAndBit() throws Exception{
     closeClient();
@@ -77,7 +81,12 @@ public class BaseTestQuery extends ExecTest{
   @BeforeClass
   public static void openClient() throws Exception{
     config = DrillConfig.create();
-    serviceSet = RemoteServiceSet.getLocalServiceSet();
+    allocator = new TopLevelAllocator(config);
+    if(config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)){
+      serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator);
+    }else{
+      serviceSet = RemoteServiceSet.getLocalServiceSet();
+    }
     bit = new Drillbit(config, serviceSet);
     bit.run();
     client = new DrillClient(config, serviceSet.getCoordinator());
@@ -85,7 +94,7 @@ public class BaseTestQuery extends ExecTest{
   }
 
   protected BufferAllocator getAllocator(){
-    return client.getAllocator();
+    return allocator;
   }
 
   @AfterClass
@@ -93,6 +102,7 @@ public class BaseTestQuery extends ExecTest{
     if(client != null) client.close();
     if(bit != null) bit.close();
     if(serviceSet != null) serviceSet.close();
+    if(allocator != null) allocator.close();
   }
 
   protected void runSQL(String sql) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
new file mode 100644
index 0000000..6375d66
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
+import org.apache.drill.exec.cache.infinispan.ICache;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.hive12.common.collect.Maps;
+
+public class TestCacheSerialization extends ExecTest {
+
+  private static DistributedCache ICACHE;
+  private static BufferAllocator ALLOCATOR;
+  private static final DrillConfig CONFIG = DrillConfig.create();
+
+  @Test
+  public void testProtobufSerialization() {
+    DistributedMap<FragmentHandleSerializable> map = ICACHE.getMap(FragmentHandleSerializable.class);
+    FragmentHandle h = FragmentHandle.newBuilder().setMajorFragmentId(1).setMinorFragmentId(1).setQueryId(QueryId.newBuilder().setPart1(74).setPart2(66).build()).build();
+    FragmentHandleSerializable s = new FragmentHandleSerializable(h);
+    map.put("1", s);
+    for(int i =0; i < 2; i++){
+      FragmentHandleSerializable s2 = map.get("1");
+      Assert.assertEquals(s.getObject(), s2.getObject());
+    }
+  }
+
+//  @Test
+//  public void testProtobufExternalizer(){
+//    final FragmentStatus fs = FragmentStatus.newBuilder().setHandle(FragmentHandle.newBuilder().setMajorFragmentId(1).setMajorFragmentId(35)).build();
+//    DistributedMap<OptionValue> map = ICACHE.getNamedMap(FragmentStatus.class);
+//    map.put("1", v);
+//    for(int i = 0; i < 5; i++){
+//      OptionValue v2 = map.get("1");
+//      Assert.assertEquals(v, v2);
+//    }
+//  }
+
+  @Test
+  public void testJackSerializable(){
+    OptionValue v = OptionValue.createBoolean(OptionType.SESSION, "my test option", true);
+    DistributedMap<OptionValue> map = ICACHE.getNamedMap("sys.options", OptionValue.class);
+    map.put("1", v);
+    for(int i = 0; i < 5; i++){
+      OptionValue v2 = map.get("1");
+      Assert.assertEquals(v, v2);
+    }
+  }
+
+  @Test
+  public void testCustomJsonSerialization(){
+    Map<String, StoragePluginConfig> configs = Maps.newHashMap();
+    configs.put("hello", new FileSystemConfig());
+    StoragePlugins p = new StoragePlugins(configs);
+
+    DistributedMap<StoragePlugins> map = ICACHE.getMap(StoragePlugins.class);
+    map.put("1", p);
+    for(int i =0; i < 2; i++){
+      StoragePlugins p2 = map.get("1");
+      Assert.assertEquals(p, p2);
+    }
+  }
+
+  @Test
+  public void testVectorCache() throws Exception {
+    List<ValueVector> vectorList = Lists.newArrayList();
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN),
+        Types.required(TypeProtos.MinorType.INT));
+    IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, ALLOCATOR);
+    MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN),
+        Types.required(TypeProtos.MinorType.VARBINARY));
+    VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, ALLOCATOR);
+    AllocationHelper.allocate(intVector, 4, 4);
+    AllocationHelper.allocate(binVector, 4, 5);
+    vectorList.add(intVector);
+    vectorList.add(binVector);
+
+    intVector.getMutator().setSafe(0, 0);
+    binVector.getMutator().setSafe(0, "ZERO".getBytes());
+    intVector.getMutator().setSafe(1, 1);
+    binVector.getMutator().setSafe(1, "ONE".getBytes());
+    intVector.getMutator().setSafe(2, 2);
+    binVector.getMutator().setSafe(2, "TWO".getBytes());
+    intVector.getMutator().setSafe(3, 3);
+    binVector.getMutator().setSafe(3, "THREE".getBytes());
+    intVector.getMutator().setValueCount(4);
+    binVector.getMutator().setValueCount(4);
+
+    VectorContainer container = new VectorContainer();
+    container.addCollection(vectorList);
+    container.setRecordCount(4);
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+    CachedVectorContainer wrap = new CachedVectorContainer(batch, ALLOCATOR);
+
+    DistributedMultiMap<CachedVectorContainer> mmap = ICACHE.getMultiMap(CachedVectorContainer.class);
+    mmap.put("vectors", wrap);
+
+    for(int x =0; x < 2; x++){
+      CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next();
+
+      VectorAccessible newContainer = newWrap.get();
+      for (VectorWrapper<?> w : newContainer) {
+        ValueVector vv = w.getValueVector();
+        int values = vv.getAccessor().getValueCount();
+        for (int i = 0; i < values; i++) {
+          Object o = vv.getAccessor().getObject(i);
+          if (o instanceof byte[]) {
+            System.out.println(new String((byte[]) o));
+          } else {
+            System.out.println(o);
+          }
+        }
+      }
+
+      newWrap.clear();
+    }
+  }
+
+  // @Test
+  // public void testHazelVectorCache() throws Exception {
+  // DrillConfig c = DrillConfig.create();
+  // HazelCache cache = new HazelCache(c, new TopLevelAllocator());
+  // cache.run();
+  // testCache(c, cache);
+  // cache.close();
+  // }
+
+  @BeforeClass
+  public static void setupCache() throws Exception {
+    ALLOCATOR = new TopLevelAllocator();
+    ICACHE = new ICache(CONFIG, ALLOCATOR);
+    ICACHE.run();
+  }
+
+  @AfterClass
+  public static void destroyCache() throws Exception {
+    ICACHE.close();
+    ALLOCATOR.close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
deleted file mode 100644
index 3e0be69..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.cache.hazel.HazelCache;
-import org.apache.drill.exec.cache.infinispan.ICache;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestVectorCache extends ExecTest{
-
-  private void testCache(DrillConfig config, DistributedCache dcache) throws Exception {
-    List<ValueVector> vectorList = Lists.newArrayList();
-    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-
-    try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache cache = dcache) {
-      bit.run();
-      cache.run();
-
-      DrillbitContext context = bit.getContext();
-
-
-      MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN),
-          Types.required(TypeProtos.MinorType.INT));
-      IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator());
-      MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN),
-          Types.required(TypeProtos.MinorType.VARBINARY));
-      VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator());
-      AllocationHelper.allocate(intVector, 4, 4);
-      AllocationHelper.allocate(binVector, 4, 5);
-      vectorList.add(intVector);
-      vectorList.add(binVector);
-
-      intVector.getMutator().setSafe(0, 0);
-      binVector.getMutator().setSafe(0, "ZERO".getBytes());
-      intVector.getMutator().setSafe(1, 1);
-      binVector.getMutator().setSafe(1, "ONE".getBytes());
-      intVector.getMutator().setSafe(2, 2);
-      binVector.getMutator().setSafe(2, "TWO".getBytes());
-      intVector.getMutator().setSafe(3, 3);
-      binVector.getMutator().setSafe(3, "THREE".getBytes());
-      intVector.getMutator().setValueCount(4);
-      binVector.getMutator().setValueCount(4);
-
-      VectorContainer container = new VectorContainer();
-      container.addCollection(vectorList);
-      container.setRecordCount(4);
-      WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
-      CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getAllocator());
-
-      DistributedMultiMap<CachedVectorContainer> mmap = cache.getMultiMap(CachedVectorContainer.class);
-      mmap.put("vectors", wrap);
-
-      CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next();
-
-      VectorAccessible newContainer = newWrap.get();
-      for (VectorWrapper<?> w : newContainer) {
-        ValueVector vv = w.getValueVector();
-        int values = vv.getAccessor().getValueCount();
-        for (int i = 0; i < values; i++) {
-          Object o = vv.getAccessor().getObject(i);
-          if (o instanceof byte[]) {
-            System.out.println(new String((byte[]) o));
-          } else {
-            System.out.println(o);
-          }
-        }
-      }
-
-      newWrap.clear();
-    }
-
-  }
-
-//  @Test
-//  public void testHazelVectorCache() throws Exception {
-//    DrillConfig c = DrillConfig.create();
-//    HazelCache cache = new HazelCache(c, new TopLevelAllocator());
-//    cache.run();
-//    testCache(c, cache);
-//    cache.close();
-//  }
-
-  @Test
-  public void testICache() throws Exception {
-    DrillConfig c = DrillConfig.create();
-    ICache cache = new ICache(c, new TopLevelAllocator());
-    testCache(c, cache);
-
-  }
-}