You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/05/24 18:14:08 UTC
[19/21] git commit: Serialization/Deserialization fixes
Serialization/Deserialization fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/710eeeec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/710eeeec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/710eeeec
Branch: refs/heads/master
Commit: 710eeeec24c7f820fb9298d259d8c1a09253e11d
Parents: 24e3af2
Author: Jacques Nadeau <ja...@apache.org>
Authored: Wed May 21 18:14:26 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri May 23 10:02:53 2014 -0700
----------------------------------------------------------------------
.../cache/LoopedAbstractDrillSerializable.java | 2 +-
.../drill/exec/cache/ProtoSerializable.java | 29 ++-
.../drill/exec/cache/infinispan/ICache.java | 3 +-
.../infinispan/JacksonAdvancedExternalizer.java | 8 +-
.../ProtobufAdvancedExternalizer.java | 2 +-
.../apache/drill/exec/client/DrillClient.java | 13 +-
.../drill/exec/server/RemoteServiceSet.java | 8 +-
.../java/org/apache/drill/BaseTestQuery.java | 14 +-
.../exec/cache/TestCacheSerialization.java | 193 +++++++++++++++++++
.../drill/exec/cache/TestVectorCache.java | 129 -------------
10 files changed, 260 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
index 1de030d..d2a7458 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LoopedAbstractDrillSerializable.java
@@ -45,7 +45,7 @@ abstract class LoopedAbstractDrillSerializable implements DrillSerializable {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
writeToStream(baos);
byte[] ba = baos.toByteArray();
- out.write(ba.length);
+ out.writeInt(ba.length);
out.write(ba);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
index 1538a85..f48aae1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoSerializable.java
@@ -45,12 +45,37 @@ public abstract class ProtoSerializable<V extends Message> extends AbstractStrea
@Override
public void readFromStream(InputStream input) throws IOException {
- obj = protoParser.parseFrom(input);
+ obj = protoParser.parseDelimitedFrom(input);
}
@Override
public void writeToStream(OutputStream output) throws IOException {
- obj.writeTo(output);
+ obj.writeDelimitedTo(output);
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((obj == null) ? 0 : obj.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ ProtoSerializable other = (ProtoSerializable) obj;
+ if (this.obj == null) {
+ if (other.obj != null)
+ return false;
+ } else if (!this.obj.equals(other.obj))
+ return false;
+ return true;
}
public static class PlanFragmentSerializable extends ProtoSerializable<PlanFragment>{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
index 92ce08d..f56f19a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ICache.java
@@ -81,8 +81,9 @@ public class ICache implements DistributedCache{
Configuration c = new ConfigurationBuilder() //
.clustering() //
+
.cacheMode(CacheMode.DIST_ASYNC) //
- .storeAsBinary() //
+ .storeAsBinary().enable() //
.build();
this.manager = new DefaultCacheManager(gc, c);
JGroupsTransport transport = (JGroupsTransport) manager.getCache("first").getAdvancedCache().getRpcManager().getTransport();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
index 81f4877..63a6d62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/JacksonAdvancedExternalizer.java
@@ -46,12 +46,16 @@ public class JacksonAdvancedExternalizer<T> implements AdvancedExternalizer<T>
@Override
public T readObject(ObjectInput in) throws IOException, ClassNotFoundException {
- return (T) mapper.readValue(DataInputInputStream.constructInputStream(in), clazz);
+ byte[] bytes = new byte[in.readInt()];
+ in.readFully(bytes);
+ return (T) mapper.readValue(bytes, clazz);
}
@Override
public void writeObject(ObjectOutput out, T object) throws IOException {
- out.write(mapper.writeValueAsBytes(object));
+ byte[] bytes = mapper.writeValueAsBytes(object);
+ out.writeInt(bytes.length);
+ out.write(bytes);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
index 821443a..df97a01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/infinispan/ProtobufAdvancedExternalizer.java
@@ -46,7 +46,7 @@ public class ProtobufAdvancedExternalizer<T extends Message> implements Advanced
@Override
public T readObject(ObjectInput in) throws IOException, ClassNotFoundException {
- return parser.parseDelimitedFrom(DataInputInputStream.constructInputStream(in));
+ return parser.parseFrom(DataInputInputStream.constructInputStream(in));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 92097e7..4755d32 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -34,6 +34,7 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.ZKClusterCoordinator;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -68,10 +69,11 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private UserProperties props = null;
private volatile ClusterCoordinator clusterCoordinator;
private volatile boolean connected = false;
- private final TopLevelAllocator allocator = new TopLevelAllocator(Long.MAX_VALUE);
+ private final BufferAllocator allocator;
private int reconnectTimes;
private int reconnectDelay;
private final boolean ownsZkConnection;
+ private final boolean ownsAllocator;
public DrillClient() {
this(DrillConfig.create());
@@ -86,7 +88,13 @@ public class DrillClient implements Closeable, ConnectionThrottle{
}
public DrillClient(DrillConfig config, ClusterCoordinator coordinator){
+ this(config, coordinator, null);
+ }
+
+ public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator){
this.ownsZkConnection = coordinator == null;
+ this.ownsAllocator = allocator == null;
+ this.allocator = allocator == null ? new TopLevelAllocator(Long.MAX_VALUE) : allocator;
this.config = config;
this.clusterCoordinator = coordinator;
this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
@@ -180,7 +188,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
}
}
- public TopLevelAllocator getAllocator() {
+ public BufferAllocator getAllocator() {
return allocator;
}
@@ -189,6 +197,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
*/
public void close(){
if(this.client != null) this.client.close();
+ if(this.ownsAllocator && allocator != null) allocator.close();
if(ownsZkConnection){
try {
this.clusterCoordinator.close();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
index 2078107..d64efa4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java
@@ -20,11 +20,13 @@ package org.apache.drill.exec.server;
import java.io.Closeable;
import java.io.IOException;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.cache.infinispan.ICache;
import org.apache.drill.exec.cache.local.LocalCache;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.LocalClusterCoordinator;
-import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
public class RemoteServiceSet implements Closeable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class);
@@ -63,4 +65,8 @@ public class RemoteServiceSet implements Closeable{
return new RemoteServiceSet(new LocalCache(), new LocalClusterCoordinator());
}
+ public static RemoteServiceSet getServiceSetWithFullCache(DrillConfig config, BufferAllocator allocator) throws Exception{
+ ICache c = new ICache(config, allocator);
+ return new RemoteServiceSet(c, new LocalClusterCoordinator());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index fcd2a3b..e7bc87d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.client.PrintingResultsListener;
import org.apache.drill.exec.client.QuerySubmitter;
import org.apache.drill.exec.client.QuerySubmitter.Format;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.rpc.RpcException;
@@ -52,6 +53,8 @@ import com.google.common.io.Resources;
public class BaseTestQuery extends ExecTest{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
+ private static final String ENABLE_FULL_CACHE = "drill.exec.test.use-full-cache";
+
public final TestRule resetWatcher = new TestWatcher() {
@Override
protected void failed(Throwable e, Description description) {
@@ -68,6 +71,7 @@ public class BaseTestQuery extends ExecTest{
protected static RemoteServiceSet serviceSet;
protected static DrillConfig config;
protected static QuerySubmitter submitter = new QuerySubmitter();
+ protected static BufferAllocator allocator;
static void resetClientAndBit() throws Exception{
closeClient();
@@ -77,7 +81,12 @@ public class BaseTestQuery extends ExecTest{
@BeforeClass
public static void openClient() throws Exception{
config = DrillConfig.create();
- serviceSet = RemoteServiceSet.getLocalServiceSet();
+ allocator = new TopLevelAllocator(config);
+ if(config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)){
+ serviceSet = RemoteServiceSet.getServiceSetWithFullCache(config, allocator);
+ }else{
+ serviceSet = RemoteServiceSet.getLocalServiceSet();
+ }
bit = new Drillbit(config, serviceSet);
bit.run();
client = new DrillClient(config, serviceSet.getCoordinator());
@@ -85,7 +94,7 @@ public class BaseTestQuery extends ExecTest{
}
protected BufferAllocator getAllocator(){
- return client.getAllocator();
+ return allocator;
}
@AfterClass
@@ -93,6 +102,7 @@ public class BaseTestQuery extends ExecTest{
if(client != null) client.close();
if(bit != null) bit.close();
if(serviceSet != null) serviceSet.close();
+ if(allocator != null) allocator.close();
}
protected void runSQL(String sql) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
new file mode 100644
index 0000000..6375d66
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestCacheSerialization.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.ExecTest;
+import org.apache.drill.exec.cache.ProtoSerializable.FragmentHandleSerializable;
+import org.apache.drill.exec.cache.infinispan.ICache;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.apache.drill.exec.server.options.OptionValue.OptionType;
+import org.apache.drill.exec.store.dfs.FileSystemConfig;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.hive12.common.collect.Maps;
+
+public class TestCacheSerialization extends ExecTest {
+
+ private static DistributedCache ICACHE;
+ private static BufferAllocator ALLOCATOR;
+ private static final DrillConfig CONFIG = DrillConfig.create();
+
+ @Test
+ public void testProtobufSerialization() {
+ DistributedMap<FragmentHandleSerializable> map = ICACHE.getMap(FragmentHandleSerializable.class);
+ FragmentHandle h = FragmentHandle.newBuilder().setMajorFragmentId(1).setMinorFragmentId(1).setQueryId(QueryId.newBuilder().setPart1(74).setPart2(66).build()).build();
+ FragmentHandleSerializable s = new FragmentHandleSerializable(h);
+ map.put("1", s);
+ for(int i =0; i < 2; i++){
+ FragmentHandleSerializable s2 = map.get("1");
+ Assert.assertEquals(s.getObject(), s2.getObject());
+ }
+ }
+
+// @Test
+// public void testProtobufExternalizer(){
+// final FragmentStatus fs = FragmentStatus.newBuilder().setHandle(FragmentHandle.newBuilder().setMajorFragmentId(1).setMajorFragmentId(35)).build();
+// DistributedMap<OptionValue> map = ICACHE.getNamedMap(FragmentStatus.class);
+// map.put("1", v);
+// for(int i = 0; i < 5; i++){
+// OptionValue v2 = map.get("1");
+// Assert.assertEquals(v, v2);
+// }
+// }
+
+ @Test
+ public void testJackSerializable(){
+ OptionValue v = OptionValue.createBoolean(OptionType.SESSION, "my test option", true);
+ DistributedMap<OptionValue> map = ICACHE.getNamedMap("sys.options", OptionValue.class);
+ map.put("1", v);
+ for(int i = 0; i < 5; i++){
+ OptionValue v2 = map.get("1");
+ Assert.assertEquals(v, v2);
+ }
+ }
+
+ @Test
+ public void testCustomJsonSerialization(){
+ Map<String, StoragePluginConfig> configs = Maps.newHashMap();
+ configs.put("hello", new FileSystemConfig());
+ StoragePlugins p = new StoragePlugins(configs);
+
+ DistributedMap<StoragePlugins> map = ICACHE.getMap(StoragePlugins.class);
+ map.put("1", p);
+ for(int i =0; i < 2; i++){
+ StoragePlugins p2 = map.get("1");
+ Assert.assertEquals(p, p2);
+ }
+ }
+
+ @Test
+ public void testVectorCache() throws Exception {
+ List<ValueVector> vectorList = Lists.newArrayList();
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN),
+ Types.required(TypeProtos.MinorType.INT));
+ IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, ALLOCATOR);
+ MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN),
+ Types.required(TypeProtos.MinorType.VARBINARY));
+ VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, ALLOCATOR);
+ AllocationHelper.allocate(intVector, 4, 4);
+ AllocationHelper.allocate(binVector, 4, 5);
+ vectorList.add(intVector);
+ vectorList.add(binVector);
+
+ intVector.getMutator().setSafe(0, 0);
+ binVector.getMutator().setSafe(0, "ZERO".getBytes());
+ intVector.getMutator().setSafe(1, 1);
+ binVector.getMutator().setSafe(1, "ONE".getBytes());
+ intVector.getMutator().setSafe(2, 2);
+ binVector.getMutator().setSafe(2, "TWO".getBytes());
+ intVector.getMutator().setSafe(3, 3);
+ binVector.getMutator().setSafe(3, "THREE".getBytes());
+ intVector.getMutator().setValueCount(4);
+ binVector.getMutator().setValueCount(4);
+
+ VectorContainer container = new VectorContainer();
+ container.addCollection(vectorList);
+ container.setRecordCount(4);
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+ CachedVectorContainer wrap = new CachedVectorContainer(batch, ALLOCATOR);
+
+ DistributedMultiMap<CachedVectorContainer> mmap = ICACHE.getMultiMap(CachedVectorContainer.class);
+ mmap.put("vectors", wrap);
+
+ for(int x =0; x < 2; x++){
+ CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next();
+
+ VectorAccessible newContainer = newWrap.get();
+ for (VectorWrapper<?> w : newContainer) {
+ ValueVector vv = w.getValueVector();
+ int values = vv.getAccessor().getValueCount();
+ for (int i = 0; i < values; i++) {
+ Object o = vv.getAccessor().getObject(i);
+ if (o instanceof byte[]) {
+ System.out.println(new String((byte[]) o));
+ } else {
+ System.out.println(o);
+ }
+ }
+ }
+
+ newWrap.clear();
+ }
+ }
+
+ // @Test
+ // public void testHazelVectorCache() throws Exception {
+ // DrillConfig c = DrillConfig.create();
+ // HazelCache cache = new HazelCache(c, new TopLevelAllocator());
+ // cache.run();
+ // testCache(c, cache);
+ // cache.close();
+ // }
+
+ @BeforeClass
+ public static void setupCache() throws Exception {
+ ALLOCATOR = new TopLevelAllocator();
+ ICACHE = new ICache(CONFIG, ALLOCATOR);
+ ICACHE.run();
+ }
+
+ @AfterClass
+ public static void destroyCache() throws Exception {
+ ICACHE.close();
+ ALLOCATOR.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/710eeeec/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
deleted file mode 100644
index 3e0be69..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import java.util.List;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.cache.hazel.HazelCache;
-import org.apache.drill.exec.cache.infinispan.ICache;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
-import org.apache.drill.exec.server.Drillbit;
-import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.RemoteServiceSet;
-import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-public class TestVectorCache extends ExecTest{
-
- private void testCache(DrillConfig config, DistributedCache dcache) throws Exception {
- List<ValueVector> vectorList = Lists.newArrayList();
- RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-
- try (Drillbit bit = new Drillbit(config, serviceSet); DistributedCache cache = dcache) {
- bit.run();
- cache.run();
-
- DrillbitContext context = bit.getContext();
-
-
- MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN),
- Types.required(TypeProtos.MinorType.INT));
- IntVector intVector = (IntVector) TypeHelper.getNewVector(intField, context.getAllocator());
- MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN),
- Types.required(TypeProtos.MinorType.VARBINARY));
- VarBinaryVector binVector = (VarBinaryVector) TypeHelper.getNewVector(binField, context.getAllocator());
- AllocationHelper.allocate(intVector, 4, 4);
- AllocationHelper.allocate(binVector, 4, 5);
- vectorList.add(intVector);
- vectorList.add(binVector);
-
- intVector.getMutator().setSafe(0, 0);
- binVector.getMutator().setSafe(0, "ZERO".getBytes());
- intVector.getMutator().setSafe(1, 1);
- binVector.getMutator().setSafe(1, "ONE".getBytes());
- intVector.getMutator().setSafe(2, 2);
- binVector.getMutator().setSafe(2, "TWO".getBytes());
- intVector.getMutator().setSafe(3, 3);
- binVector.getMutator().setSafe(3, "THREE".getBytes());
- intVector.getMutator().setValueCount(4);
- binVector.getMutator().setValueCount(4);
-
- VectorContainer container = new VectorContainer();
- container.addCollection(vectorList);
- container.setRecordCount(4);
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
- CachedVectorContainer wrap = new CachedVectorContainer(batch, context.getAllocator());
-
- DistributedMultiMap<CachedVectorContainer> mmap = cache.getMultiMap(CachedVectorContainer.class);
- mmap.put("vectors", wrap);
-
- CachedVectorContainer newWrap = (CachedVectorContainer) mmap.get("vectors").iterator().next();
-
- VectorAccessible newContainer = newWrap.get();
- for (VectorWrapper<?> w : newContainer) {
- ValueVector vv = w.getValueVector();
- int values = vv.getAccessor().getValueCount();
- for (int i = 0; i < values; i++) {
- Object o = vv.getAccessor().getObject(i);
- if (o instanceof byte[]) {
- System.out.println(new String((byte[]) o));
- } else {
- System.out.println(o);
- }
- }
- }
-
- newWrap.clear();
- }
-
- }
-
-// @Test
-// public void testHazelVectorCache() throws Exception {
-// DrillConfig c = DrillConfig.create();
-// HazelCache cache = new HazelCache(c, new TopLevelAllocator());
-// cache.run();
-// testCache(c, cache);
-// cache.close();
-// }
-
- @Test
- public void testICache() throws Exception {
- DrillConfig c = DrillConfig.create();
- ICache cache = new ICache(c, new TopLevelAllocator());
- testCache(c, cache);
-
- }
-}