You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:21 UTC
[04/10] git commit: DRILL-271 refactor DistributedCache code. Uses
hazel cast 3.1 and custom serialization.
DRILL-271 refactor DistributedCache code. Uses hazel cast 3.1 and custom serialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d529352e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d529352e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d529352e
Branch: refs/heads/master
Commit: d529352e03410e1612db691677dc90c855342c01
Parents: 266d248
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Oct 29 20:24:00 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:38 2013 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 2 +-
.../exec/cache/HCDrillSerializableWrapper.java | 63 -------
.../cache/HCSerializableWrapperClasses.java | 31 ----
.../cache/HCVectorAccessibleSerializer.java | 56 ++++++
.../org/apache/drill/exec/cache/HazelCache.java | 37 ++--
.../org/apache/drill/exec/cache/LocalCache.java | 12 +-
.../apache/drill/exec/cache/ProtoBufWrap.java | 8 +-
.../cache/VectorAccessibleSerializable.java | 184 ++++++++++++++++++
.../exec/cache/VectorContainerSerializable.java | 186 -------------------
.../OrderedPartitionRecordBatch.java | 20 +-
.../org/apache/drill/exec/server/Drillbit.java | 6 +-
.../drill/exec/cache/TestVectorCache.java | 8 +-
.../drill/exec/cache/TestWriteToDisk.java | 108 +++++++++++
13 files changed, 403 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 063e60e..c5b169d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -185,7 +185,7 @@
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
- <version>2.5.1</version>
+ <version>3.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
deleted file mode 100644
index 3f2c41c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.hazelcast.nio.DataSerializable;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public abstract class HCDrillSerializableWrapper implements DataSerializable {
-
- private DrillSerializable obj;
-
- public HCDrillSerializableWrapper() {}
-
- public HCDrillSerializableWrapper(DrillSerializable obj) {
- this.obj = obj;
- }
-
- public void readData(DataInput in) throws IOException {
- obj.read(in);
- }
-
- public void writeData(DataOutput out) throws IOException {
- obj.write(out);
- }
-
- public DrillSerializable get() {
- return obj;
- }
-
- /**
- * This is a method that will get a Class specific implementation of HCDrillSerializableWrapper. Class specific implentations
- * are necessary because Hazel Cast requires object that have constructors with no parameters.
- * @param value
- * @param clazz
- * @return
- */
- public static HCDrillSerializableWrapper getWrapper(DrillSerializable value, Class clazz) {
- if (clazz.equals(VectorContainerSerializable.class)) {
- return new HCSerializableWrapperClasses.HCVectorListSerializable(value);
- } else {
- throw new UnsupportedOperationException("HCDrillSerializableWrapper not implemented for " + clazz);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
deleted file mode 100644
index d22723a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-public class HCSerializableWrapperClasses {
- public static class HCVectorListSerializable extends HCDrillSerializableWrapper {
-
- public HCVectorListSerializable() {
- super(new VectorContainerSerializable());
- }
-
- public HCVectorListSerializable(DrillSerializable obj) {
- super(obj);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
new file mode 100644
index 0000000..0d5ba96
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
+
+ private BufferAllocator allocator;
+
+ public HCVectorAccessibleSerializer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
+ VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+ va.readFromStream(DataInputInputStream.constructInputStream(in));
+ return va;
+ }
+
+ public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
+ va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
+ }
+
+ public void destroy() {}
+
+ public int getTypeId() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 577dfeb..9dd4373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -22,12 +22,15 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.*;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
@@ -35,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hazelcast.config.Config;
+import org.apache.drill.exec.server.DrillbitContext;
public class HazelCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -44,9 +48,11 @@ public class HazelCache implements DistributedCache {
private ITopic<HWorkQueueStatus> workQueueLengths;
private HandlePlan fragments;
private Cache<WorkQueueStatus, Integer> endpoints;
-
- public HazelCache(DrillConfig config) {
+ private BufferAllocator allocator;
+
+ public HazelCache(DrillConfig config, BufferAllocator allocator) {
this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+ this.allocator = allocator;
}
private class Listener implements MessageListener<HWorkQueueStatus>{
@@ -61,7 +67,10 @@ public class HazelCache implements DistributedCache {
public void run() {
Config c = new Config();
+ SerializerConfig sc = new SerializerConfig().setImplementation(new HCVectorAccessibleSerializer(allocator))
+ .setTypeClass(VectorAccessibleSerializable.class);
c.setInstanceName(instanceName);
+ c.getSerializationConfig().addSerializerConfig(sc);
instance = getInstanceOrCreateNew(c);
workQueueLengths = instance.getTopic("queue-length");
fragments = new HandlePlan(instance);
@@ -120,11 +129,11 @@ public class HazelCache implements DistributedCache {
@Override
public Counter getCounter(String name) {
- return new HCCounterImpl(this.instance.getAtomicNumber(name));
+ return new HCCounterImpl(this.instance.getAtomicLong(name));
}
public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
- private IMap<String, HCDrillSerializableWrapper> m;
+ private IMap<String, DrillSerializable> m;
private Class<V> clazz;
public HCDistributedMapImpl(IMap m, Class<V> clazz) {
@@ -133,24 +142,24 @@ public class HazelCache implements DistributedCache {
}
public DrillSerializable get(String key) {
- return m.get(key).get();
+ return m.get(key);
}
public void put(String key, DrillSerializable value) {
- m.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ m.put(key, value);
}
public void putIfAbsent(String key, DrillSerializable value) {
- m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ m.putIfAbsent(key, value);
}
public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
- m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz), ttl, timeunit);
+ m.putIfAbsent(key, value, ttl, timeunit);
}
}
public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
- private com.hazelcast.core.MultiMap<String, HCDrillSerializableWrapper> mmap;
+ private com.hazelcast.core.MultiMap<String, DrillSerializable> mmap;
private Class<V> clazz;
public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
@@ -160,22 +169,22 @@ public class HazelCache implements DistributedCache {
public Collection<DrillSerializable> get(String key) {
List<DrillSerializable> list = Lists.newArrayList();
- for (HCDrillSerializableWrapper v : mmap.get(key)) {
- list.add(v.get());
+ for (DrillSerializable v : mmap.get(key)) {
+ list.add(v);
}
return list;
}
@Override
public void put(String key, DrillSerializable value) {
- mmap.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ mmap.put(key, value);
}
}
public static class HCCounterImpl implements Counter {
- private AtomicNumber n;
+ private IAtomicLong n;
- public HCCounterImpl(AtomicNumber n) {
+ public HCCounterImpl(IAtomicLong n) {
this.n = n;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 7ad6ec6..e6275c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.cache;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -31,11 +32,15 @@ import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import com.google.common.collect.Maps;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.DrillbitContext;
public class LocalCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
@@ -44,7 +49,8 @@ public class LocalCache implements DistributedCache {
private volatile ConcurrentMap<Class, DistributedMap> maps;
private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
private volatile ConcurrentMap<String, Counter> counters;
-
+ private static final BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+
@Override
public void close() throws IOException {
handles = null;
@@ -116,10 +122,10 @@ public class LocalCache implements DistributedCache {
public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
try {
- DrillSerializable obj = (DrillSerializable)clazz.newInstance();
+ DrillSerializable obj = (DrillSerializable)clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
obj.read(in);
return obj;
- } catch (InstantiationException | IllegalAccessException | IOException e) {
+ } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
index 4aea645..448eecd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -21,11 +21,13 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
-import com.hazelcast.nio.DataSerializable;
+import com.hazelcast.nio.serialization.DataSerializable;
public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
@@ -43,7 +45,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
}
@Override
- public void readData(DataInput arg0) throws IOException {
+ public void readData(ObjectDataInput arg0) throws IOException {
int len = arg0.readShort();
byte[] b = new byte[len];
arg0.readFully(b);
@@ -51,7 +53,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
}
@Override
- public void writeData(DataOutput arg0) throws IOException {
+ public void writeData(ObjectDataOutput arg0) throws IOException {
byte[] b = value.toByteArray();
if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
arg0.writeShort(b.length);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
new file mode 100644
index 0000000..24387d8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.google.common.collect.Lists;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+
+import java.io.*;
+import java.util.List;
+
+public class VectorAccessibleSerializable implements DrillSerializable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
+ static final MetricRegistry metrics = DrillMetrics.getInstance();
+ static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
+
+ private VectorAccessible va;
+ private BufferAllocator allocator;
+ private int recordCount = -1;
+ private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
+ private SelectionVector2 sv2;
+
+ /**
+ *
+ * @param va
+ */
+ public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
+ this.va = va;
+ this.allocator = allocator;
+ }
+
+ public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
+ this.va = va;
+ this.allocator = allocator;
+ this.sv2 = sv2;
+ if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
+
+ public VectorAccessibleSerializable(BufferAllocator allocator) {
+ this.va = new VectorContainer();
+ this.allocator = allocator;
+ }
+
+ @Override
+ public void read(DataInput input) throws IOException {
+ readFromStream(DataInputInputStream.constructInputStream(input));
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ VectorContainer container = new VectorContainer();
+ UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+ recordCount = batchDef.getRecordCount();
+ if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+ sv2.allocateNew(recordCount * 2);
+ sv2.getBuffer().setBytes(0, input, recordCount * 2);
+ svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
+ List<ValueVector> vectorList = Lists.newArrayList();
+ List<FieldMetadata> fieldList = batchDef.getFieldList();
+ for (FieldMetadata metaData : fieldList) {
+ int dataLength = metaData.getBufferLength();
+ byte[] bytes = new byte[dataLength];
+ input.read(bytes);
+ MaterializedField field = MaterializedField.create(metaData.getDef());
+ ByteBuf buf = allocator.buffer(dataLength);
+ buf.setBytes(0, bytes);
+ ValueVector vector = TypeHelper.getNewVector(field, allocator);
+ vector.load(metaData, buf);
+ vectorList.add(vector);
+ }
+ container.addCollection(vectorList);
+ container.buildSchema(svMode);
+ container.setRecordCount(recordCount);
+ va = container;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ writeToStream(DataOutputOutputStream.constructOutputStream(output));
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
+
+ ByteBuf[] incomingBuffers = batch.getBuffers();
+ UserBitShared.RecordBatchDef batchDef = batch.getDef();
+
+ /* ByteBuf associated with the selection vector */
+ ByteBuf svBuf = null;
+
+ /* Size of the selection vector */
+ int svCount = 0;
+
+ if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+ {
+ svCount = sv2.getCount();
+ svBuf = sv2.getBuffer();
+ }
+
+ int totalBufferLength = 0;
+
+ try
+ {
+ /* Write the metadata to the file */
+ batchDef.writeDelimitedTo(output);
+
+ /* If we have a selection vector, dump it to file first */
+ if (svBuf != null)
+ {
+
+ /* For writing to the selection vectors we use
+ * setChar() method which does not modify the
+ * reader and writer index. To copy the entire buffer
+ * without having to get each byte individually we need
+ * to set the writer index
+ */
+ svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
+
+// fc.write(svBuf.nioBuffers());
+ svBuf.getBytes(0, output, svBuf.readableBytes());
+ svBuf.release();
+ }
+
+ /* Dump the array of ByteBuf's associated with the value vectors */
+ for (ByteBuf buf : incomingBuffers)
+ {
+ /* dump the buffer into the file channel */
+ int bufLength = buf.readableBytes();
+ buf.getBytes(0, output, bufLength);
+
+ /* compute total length of buffer, will be used when
+ * we create a compound buffer
+ */
+ totalBufferLength += buf.readableBytes();
+ buf.release();
+ }
+
+ output.flush();
+ context.stop();
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ } finally {
+ clear();
+ }
+ }
+
+ private void clear() {
+ }
+
+ public VectorAccessible get() {
+ return va;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
deleted file mode 100644
index 5813dd6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.google.common.collect.Lists;
-import com.yammer.metrics.MetricRegistry;
-import com.yammer.metrics.Timer;
-import io.netty.buffer.ByteBuf;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-
-import java.io.*;
-import java.util.List;
-
-public class VectorContainerSerializable implements DrillSerializable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
- static final MetricRegistry metrics = DrillMetrics.getInstance();
- static final String WRITER_TIMER = MetricRegistry.name(VectorContainerSerializable.class, "writerTime");
-
- private VectorAccessible va;
- private BootStrapContext context;
- private int recordCount = -1;
- private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
- private SelectionVector2 sv2;
-
- /**
- *
- * @param va
- */
- public VectorContainerSerializable(VectorAccessible va){
- this.va = va;
- this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
- }
-
- public VectorContainerSerializable(VectorAccessible va, SelectionVector2 sv2, FragmentContext context) {
- this.va = va;
- this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
- this.sv2 = sv2;
- if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
- }
-
- public VectorContainerSerializable() {
- this.va = new VectorContainer();
- this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
- }
-
- @Override
- public void read(DataInput input) throws IOException {
- readFromStream(DataInputInputStream.constructInputStream(input));
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- VectorContainer container = new VectorContainer();
- UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
- recordCount = batchDef.getRecordCount();
- if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
- sv2.allocateNew(recordCount * 2);
- sv2.getBuffer().setBytes(0, input, recordCount * 2);
- svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
- }
- List<ValueVector> vectorList = Lists.newArrayList();
- List<FieldMetadata> fieldList = batchDef.getFieldList();
- for (FieldMetadata metaData : fieldList) {
- int dataLength = metaData.getBufferLength();
- byte[] bytes = new byte[dataLength];
- input.read(bytes);
- MaterializedField field = MaterializedField.create(metaData.getDef());
- ByteBuf buf = context.getAllocator().buffer(dataLength);
- buf.setBytes(0, bytes);
- ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
- vector.load(metaData, buf);
- vectorList.add(vector);
- }
- container.addCollection(vectorList);
- container.buildSchema(svMode);
- container.setRecordCount(recordCount);
- va = container;
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- writeToStream(DataOutputOutputStream.constructOutputStream(output));
- }
-
- @Override
- public void writeToStream(OutputStream output) throws IOException {
- final Timer.Context context = metrics.timer(WRITER_TIMER).time();
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
-
- ByteBuf[] incomingBuffers = batch.getBuffers();
- UserBitShared.RecordBatchDef batchDef = batch.getDef();
-
- /* ByteBuf associated with the selection vector */
- ByteBuf svBuf = null;
-
- /* Size of the selection vector */
- int svCount = 0;
-
- if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
- {
- svCount = sv2.getCount();
- svBuf = sv2.getBuffer();
- }
-
- int totalBufferLength = 0;
-
- try
- {
- /* Write the metadata to the file */
- batchDef.writeDelimitedTo(output);
-
- /* If we have a selection vector, dump it to file first */
- if (svBuf != null)
- {
-
- /* For writing to the selection vectors we use
- * setChar() method which does not modify the
- * reader and writer index. To copy the entire buffer
- * without having to get each byte individually we need
- * to set the writer index
- */
- svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-// fc.write(svBuf.nioBuffers());
- svBuf.getBytes(0, output, svBuf.readableBytes());
- svBuf.release();
- }
-
- /* Dump the array of ByteBuf's associated with the value vectors */
- for (ByteBuf buf : incomingBuffers)
- {
- /* dump the buffer into the file channel */
- int bufLength = buf.readableBytes();
- buf.getBytes(0, output, bufLength);
-
- /* compute total length of buffer, will be used when
- * we create a compound buffer
- */
- totalBufferLength += buf.readableBytes();
- buf.release();
- }
-
- output.flush();
- context.stop();
- } catch (IOException e)
- {
- throw new RuntimeException(e);
- } finally {
- clear();
- }
- }
-
- private void clear() {
- }
-
- public VectorAccessible get() {
- return va;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index ef3886c..e39b82e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -82,7 +82,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
private boolean startedUnsampledBatches = false;
private boolean upstreamNone = false;
private int recordCount;
- private DistributedMap<VectorContainerSerializable> tableMap;
+ private DistributedMap<VectorAccessibleSerializable> tableMap;
private DistributedMultiMap mmap;
private String mapKey;
@@ -154,13 +154,13 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
DistributedCache cache = context.getDrillbitContext().getCache();
mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
- mmap = cache.getMultiMap(VectorContainerSerializable.class);
+ mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
List<ValueVector> vectorList = Lists.newArrayList();
for (VectorWrapper vw : containerToCache) {
vectorList.add(vw.getValueVector());
}
- VectorContainerSerializable wrap = new VectorContainerSerializable(containerToCache);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
mmap.put(mapKey, wrap);
wrap = null;
@@ -169,24 +169,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
long val = minorFragmentSampleCount.incrementAndGet();
logger.debug("Incremented mfsc, got {}", val);
- tableMap = cache.getMap(VectorContainerSerializable.class);
+ tableMap = cache.getMap(VectorAccessibleSerializable.class);
Preconditions.checkNotNull(tableMap);
if (val == Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
buildTable();
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
} else if (val < Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
// Wait until sufficient number of fragments have submitted samples, or proceed after 100 ms passed
for (int i = 0; i < 100 && wrap == null; i++) {
Thread.sleep(10);
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
if (i == 99) {
buildTable();
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
}
}
} else {
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
}
Preconditions.checkState(wrap != null);
@@ -211,7 +211,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer);
for (DrillSerializable w : allSamplesWrap) {
- containerBuilder.add(((VectorContainerSerializable)w).get());
+ containerBuilder.add(((VectorAccessibleSerializable)w).get());
}
containerBuilder.build(context);
@@ -239,7 +239,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
- VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 052f415..49732d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -70,7 +70,7 @@ public class Drillbit implements Closeable{
final DistributedCache cache;
final WorkManager manager;
final BootStrapContext context;
-
+
private volatile RegistrationHandle handle;
public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
@@ -86,15 +86,15 @@ public class Drillbit implements Closeable{
this.manager = new WorkManager(context);
this.coord = new ZKClusterCoordinator(config);
this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
- this.cache = new HazelCache(config);
+ this.cache = new HazelCache(config, context.getAllocator());
}
}
public void run() throws Exception {
coord.start(10000);
DrillbitEndpoint md = engine.start();
- cache.run();
manager.start(md, cache, engine.getBitCom(), coord);
+ cache.run();
handle = coord.register(md);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index ffc0274..94aa3dd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -46,7 +46,7 @@ public class TestVectorCache {
Drillbit bit = new Drillbit(config, serviceSet);
bit.run();
DrillbitContext context = bit.getContext();
- HazelCache cache = new HazelCache(config);
+ HazelCache cache = new HazelCache(config, context.getAllocator());
cache.run();
MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
@@ -68,11 +68,11 @@ public class TestVectorCache {
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
- VectorContainerSerializable wrap = new VectorContainerSerializable(container);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
- DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
+ DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
mmap.put("vectors", wrap);
- VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
+ VectorAccessibleSerializable newWrap = (VectorAccessibleSerializable)mmap.get("vectors").iterator().next();
VectorAccessible newContainer = newWrap.get();
for (VectorWrapper w : newContainer) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
new file mode 100644
index 0000000..11d15d8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestWriteToDisk {
+
+ @Test
+ public void test() throws Exception {
+ List<ValueVector> vectorList = Lists.newArrayList();
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ DrillConfig config = DrillConfig.create();
+ Drillbit bit = new Drillbit(config, serviceSet);
+ bit.run();
+ DrillbitContext context = bit.getContext();
+
+ MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
+ IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator());
+ MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY));
+ VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
+ AllocationHelper.allocate(intVector, 4, 4);
+ AllocationHelper.allocate(binVector, 4, 5);
+ vectorList.add(intVector);
+ vectorList.add(binVector);
+
+ intVector.getMutator().set(0, 0); binVector.getMutator().set(0, "ZERO".getBytes());
+ intVector.getMutator().set(1, 1); binVector.getMutator().set(1, "ONE".getBytes());
+ intVector.getMutator().set(2, 2); binVector.getMutator().set(2, "TWO".getBytes());
+ intVector.getMutator().set(3, 3); binVector.getMutator().set(3, "THREE".getBytes());
+ intVector.getMutator().setValueCount(4);
+ binVector.getMutator().setValueCount(4);
+
+ VectorContainer container = new VectorContainer();
+ container.addCollection(vectorList);
+ container.setRecordCount(4);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", "file:///");
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path("/tmp/drillSerializable");
+ if (fs.exists(path)) fs.delete(path, false);
+ FSDataOutputStream out = fs.create(path);
+
+ wrap.writeToStream(out);
+ out.close();
+
+ FSDataInputStream in = fs.open(path);
+ VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(context.getAllocator());
+ newWrap.readFromStream(in);
+ fs.close();
+
+ VectorAccessible newContainer = newWrap.get();
+ for (VectorWrapper w : newContainer) {
+ ValueVector vv = w.getValueVector();
+ int values = vv.getAccessor().getValueCount();
+ for (int i = 0; i < values; i++) {
+ Object o = vv.getAccessor().getObject(i);
+ if (o instanceof byte[]) {
+ System.out.println(new String((byte[])o));
+ } else {
+ System.out.println(o);
+ }
+ }
+ }
+ }
+}