You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:18 UTC
[01/10] git commit: DRILL-256 revised patch
Updated Branches:
refs/heads/master 964f01413 -> c287fa604
DRILL-256 revised patch
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c78890a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c78890a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c78890a
Branch: refs/heads/master
Commit: 6c78890a81e4d487926d9bfc3c0eb68edfa07524
Parents: 964f014
Author: Mehant Baid <me...@github.com>
Authored: Wed Oct 23 02:04:01 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Wed Oct 30 19:02:04 2013 -0700
----------------------------------------------------------------------
distribution/src/resources/drill-override.conf | 3 +
.../org/apache/drill/exec/ExecConstants.java | 1 +
.../apache/drill/exec/ops/FragmentContext.java | 5 +
.../physical/base/AbstractPhysicalVisitor.java | 5 +
.../exec/physical/base/PhysicalVisitor.java | 1 +
.../drill/exec/physical/config/Trace.java | 70 +++++
.../drill/exec/physical/impl/ImplCreator.java | 12 +-
.../drill/exec/physical/impl/TraceInjector.java | 90 +++++++
.../physical/impl/trace/TraceBatchCreator.java | 41 +++
.../physical/impl/trace/TraceRecordBatch.java | 259 +++++++++++++++++++
.../drill/exec/record/RecordBatchLoader.java | 3 +-
.../apache/drill/exec/record/WritableBatch.java | 15 +-
.../exec/record/selection/SelectionVector2.java | 44 +++-
.../src/main/resources/drill-module.conf | 3 +
.../impl/trace/TestTraceMultiRecordBatch.java | 84 ++++++
.../impl/trace/TestTraceOutputDump.java | 161 ++++++++++++
.../drill/exec/record/vector/TestLoad.java | 2 +-
.../trace/multi_record_batch_trace.json | 49 ++++
.../src/test/resources/trace/simple_trace.json | 37 +++
.../apache/drill/exec/proto/UserBitShared.java | 100 ++++++-
.../drill/exec/proto/helper/QueryIdHelper.java | 32 +++
protocol/src/main/protobuf/UserBitShared.proto | 3 +-
22 files changed, 993 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index c2ed9df..7694ced 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -71,4 +71,7 @@ drill.exec: {
global.max.width: 100,
executor.threads: 4
}
+ trace: {
+ directory: "/var/log/drill"
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 72776d1..3aec702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -43,4 +43,5 @@ public interface ExecConstants {
public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads";
public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
+ public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 352c467..a7f6d2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.ops;
import java.io.IOException;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -141,4 +142,8 @@ public class FragmentContext {
public QueryClassLoader getClassLoader(){
return loader;
}
+
+ public DrillConfig getConfig() {
+ return context.getConfig();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index bf6c68c..9e2ef0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -43,6 +43,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
+ public T visitTrace(Trace trace, X value) throws E{
+ return visitOp(trace, value);
+ }
+
+ @Override
public T visitSort(Sort sort, X value) throws E{
return visitOp(sort, value);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5692b9f..2474c15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -37,6 +37,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
+ public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
new file mode 100644
index 0000000..a81d3e9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("trace")
+public class Trace extends AbstractSingle {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Trace.class);
+
+ /* Tag associated with each trace operator
+ * Printed along with trace output to distinguish
+ * between multiple trace operators within same plan
+ */
+ public final String traceTag;
+
+ public Trace(@JsonProperty("child") PhysicalOperator child, @JsonProperty("tag") String traceTag) {
+ super(child);
+ this.traceTag = traceTag;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitTrace(this, value);
+ }
+
+ @Override
+ public OperatorCost getCost() {
+
+ /* Compute the total size (row count * row size) */
+ Size size = child.getSize();
+ long diskSize = size.getRecordCount() * size.getRecordSize();
+
+ return new OperatorCost(0, diskSize, 0, child.getSize().getRecordCount());
+ }
+
+ @Override
+ public Size getSize() {
+ return child.getSize();
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new Trace(child, traceTag);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 844b3a7..67e9452 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreato
import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
+import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator;
import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.json.JSONScanBatchCreator;
@@ -73,9 +74,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
private AggBatchCreator abc = new AggBatchCreator();
private MergeJoinCreator mjc = new MergeJoinCreator();
private RootExec root = null;
-
+ private TraceBatchCreator tbc = new TraceBatchCreator();
+
private ImplCreator(){}
-
+
public RootExec getRoot(){
return root;
}
@@ -86,6 +88,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
}
@Override
+ public RecordBatch visitTrace(Trace op, FragmentContext context) throws ExecutionSetupException {
+ return tbc.getBatch(context, op, getChildren(op, context));
+ }
+ @Override
public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(subScan);
Preconditions.checkNotNull(context);
@@ -153,7 +159,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
return fbc.getBatch(context, filter, getChildren(filter, context));
}
-
+
@Override
public RecordBatch visitStreamingAggregate(StreamingAggregate config, FragmentContext context)
throws ExecutionSetupException {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
new file mode 100644
index 0000000..3e82a73
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Trace;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+
+public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> {
+
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceInjector.class);
+
+ static int traceTagCount = 0;
+
+
+ RootExec root = null;
+ private ScreenCreator sc = new ScreenCreator();
+
+ public static PhysicalOperator getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+ TraceInjector tI = new TraceInjector();
+ PhysicalOperator newOp = root.accept(tI, context);
+
+ return newOp;
+ }
+
+ /**
+ * Traverse the physical plan and inject the trace operator after
+ * every operator.
+ * @param op Physical operator under which the trace operator will be injected
+ * @param context Fragment context
+ * @return same physical operator as passed in, but its child will be a trace operator
+ * whose child will be the original child of this operator
+ * @throws ExecutionSetupException
+ */
+ @Override
+ public PhysicalOperator visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
+
+ List<PhysicalOperator> newChildren = Lists.newArrayList();
+ List<PhysicalOperator> list = null;
+ PhysicalOperator newOp = op;
+
+ /* Get the list of child operators */
+ for (PhysicalOperator child : op)
+ {
+ newChildren.add(child.accept(this, context));
+ }
+
+ list = Lists.newArrayList();
+
+ /* For every child operator create a trace operator as its parent */
+ for (int i = 0; i < newChildren.size(); i++)
+ {
+ String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
+
+ /* Trace operator */
+ Trace traceOp = new Trace(newChildren.get(i), traceTag);
+ list.add(traceOp);
+ }
+
+ /* Inject trace operator */
+ if (list.size() > 0)
+ newOp = op.getNewWithChildren(list);
+
+ return newOp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
new file mode 100644
index 0000000..e857c25
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.trace;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Trace;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+public class TraceBatchCreator implements BatchCreator<Trace> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
+ //Preconditions.checkArgument(children.size() == 1);
+ return new TraceRecordBatch(config, children.iterator().next(), context);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
new file mode 100644
index 0000000..97131cd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.trace;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Trace;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+
+import io.netty.buffer.ByteBuf;
+
+public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
+{
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+
+ private SelectionVector2 sv = null;
+
+ /* Tag associated with each trace operator */
+ final String traceTag;
+
+ /* Location where the log should be dumped */
+ private final String logLocation;
+
+ public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
+ {
+ super(pop, context, incoming);
+ this.traceTag = pop.traceTag;
+ logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+ }
+
+ @Override
+ public int getRecordCount()
+ {
+ if (sv == null)
+ return incoming.getRecordCount();
+ else
+ return sv.getCount();
+ }
+
+ /**
+ * Function is invoked for every record batch and it simply
+ * dumps the buffers associated with all the value vectors in
+ * this record batch to a log file.
+ */
+ @Override
+ protected void doWork()
+ {
+ /* get the selection vector mode */
+ SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+
+ /* Get the array of buffers from the incoming record batch */
+ WritableBatch batch = incoming.getWritableBatch();
+
+ BufferAllocator allocator = context.getAllocator();
+ ByteBuf[] incomingBuffers = batch.getBuffers();
+ RecordBatchDef batchDef = batch.getDef();
+
+ /* Total length of buffers across all value vectors */
+ int totalBufferLength = 0;
+
+ String fileName = getFileName();
+
+ try
+ {
+ File file = new File(fileName);
+
+ if (!file.exists())
+ file.createNewFile();
+
+ FileOutputStream fos = new FileOutputStream(file, true);
+
+ /* Write the metadata to the file */
+ batchDef.writeDelimitedTo(fos);
+
+ FileChannel fc = fos.getChannel();
+
+ /* If we have a selection vector, dump it to file first */
+ if (svMode == SelectionVectorMode.TWO_BYTE)
+ {
+ SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
+ int recordCount = incomingSV2.getCount();
+ int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;
+
+ ByteBuf buf = incomingSV2.getBuffer();
+
+ /* For writing to the selection vectors we use
+ * setChar() method which does not modify the
+ * reader and writer index. To copy the entire buffer
+ * without having to get each byte individually we need
+ * to set the writer index
+ */
+ buf.writerIndex(sv2Size);
+
+ /* dump the selection vector to log */
+ dumpByteBuf(fc, buf);
+
+ if (sv == null)
+ sv = new SelectionVector2(allocator);
+
+ sv.setRecordCount(recordCount);
+
+ /* create our selection vector from the
+ * incoming selection vector's buffer
+ */
+ sv.setBuffer(buf);
+
+ buf.release();
+ }
+
+ /* For each buffer dump it to log and compute total length */
+ for (ByteBuf buf : incomingBuffers)
+ {
+ /* dump the buffer into the file channel */
+ dumpByteBuf(fc, buf);
+
+ /* Reset reader index on the ByteBuf so we can read it again */
+ buf.resetReaderIndex();
+
+ /* compute total length of buffer, will be used when
+ * we create a compound buffer
+ */
+ totalBufferLength += buf.readableBytes();
+ }
+
+ fc.close();
+ fos.close();
+
+ } catch (IOException e)
+ {
+ logger.error("Unable to write buffer to file: " + fileName);
+ }
+
+ /* allocate memory for the compound buffer, compound buffer
+ * will contain the data from all the buffers across all the
+ * value vectors
+ */
+ ByteBuf byteBuf = allocator.buffer(totalBufferLength);
+
+ /* Copy data from each buffer into the compound buffer */
+ for (int i = 0; i < incomingBuffers.length; i++)
+ {
+ byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
+ }
+
+ List<FieldMetadata> fields = batchDef.getFieldList();
+
+ int bufferOffset = 0;
+
+ /* For each value vector slice up the appropriate size from
+ * the compound buffer and load it into the value vector
+ */
+ int vectorIndex = 0;
+ for(VectorWrapper<?> vv : container)
+ {
+ FieldMetadata fmd = fields.get(vectorIndex);
+ ValueVector v = vv.getValueVector();
+ v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
+ vectorIndex++;
+ bufferOffset += fmd.getBufferLength();
+ }
+
+ container.buildSchema(svMode);
+
+ /* Set the record count in the value vector */
+ for(VectorWrapper<?> v : container)
+ {
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(incoming.getRecordCount());
+ }
+ }
+
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException
+ {
+ /* Trace operator does not deal with hyper vectors yet */
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+ /* we have a new schema, clear our existing container to
+ * load the new value vectors
+ */
+ container.clear();
+
+ /* Add all the value vectors in the container */
+ for(VectorWrapper<?> vv : incoming)
+ {
+ TransferPair tp = vv.getValueVector().getTransferPair();
+ container.add(tp.getTo());
+ }
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv;
+ }
+
+ private String getFileName()
+ {
+ /* From the context, get the query id, major fragment id,
+ * minor fragment id. This will be used as the file name
+ * to which we will dump the incoming buffer data
+ */
+ FragmentHandle handle = incoming.getContext().getHandle();
+
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
+ }
+
+ private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
+ {
+ int bufferLength = buf.readableBytes();
+
+ byte[] byteArray = new byte[bufferLength];
+
+ /* Transfer bytes to a byte array */
+ buf.readBytes(byteArray);
+
+ /* Drop the byte array into the file channel */
+ fc.write(ByteBuffer.wrap(byteArray));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 4057c58..016f340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -143,7 +143,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
}
public WritableBatch getWritableBatch(){
- return WritableBatch.getBatchNoSVWrap(valueCount, container);
+ boolean isSV2 = (schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE);
+ return WritableBatch.getBatchNoHVWrap(valueCount, container, isSV2);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 5fb9b0a..76b79db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -58,16 +58,16 @@ public class WritableBatch {
return buffers;
}
- public static WritableBatch getBatchNoSVWrap(int recordCount, Iterable<VectorWrapper<?>> vws) {
+ public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
List<ValueVector> vectors = Lists.newArrayList();
for(VectorWrapper<?> vw : vws){
Preconditions.checkArgument(!vw.isHyper());
vectors.add(vw.getValueVector());
}
- return getBatchNoSV(recordCount, vectors);
+ return getBatchNoHV(recordCount, vectors, isSV2);
}
- public static WritableBatch getBatchNoSV(int recordCount, Iterable<ValueVector> vectors) {
+ public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
@@ -84,14 +84,17 @@ public class WritableBatch {
vv.clear();
}
- RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
+ RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).setIsSelectionVector2(isSV2).build();
WritableBatch b = new WritableBatch(batchDef, buffers);
return b;
}
public static WritableBatch get(RecordBatch batch) {
- if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE) throw new UnsupportedOperationException("Only batches without selections vectors are writable.");
- return getBatchNoSVWrap(batch.getRecordCount(), batch);
+ if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
+
+ boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
+ return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 6105ead..6ff6cf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -35,6 +35,8 @@ public class SelectionVector2 implements Closeable{
private int recordCount;
private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
+ public static final int RECORD_SIZE = 2;
+
public SelectionVector2(BufferAllocator allocator) {
this.allocator = allocator;
}
@@ -43,24 +45,54 @@ public class SelectionVector2 implements Closeable{
return recordCount;
}
+ public ByteBuf getBuffer()
+ {
+ ByteBuf bufferHandle = this.buffer;
+
+ /* Increment the ref count for this buffer */
+ bufferHandle.retain();
+
+ /* We are passing ownership of the buffer to the
+ * caller. clear the buffer from within our selection vector
+ */
+ clear();
+
+ return bufferHandle;
+ }
+
+ public void setBuffer(ByteBuf bufferHandle)
+ {
+ /* clear the existing buffer */
+ clear();
+
+ this.buffer = bufferHandle;
+ buffer.retain();
+ }
+
public char getIndex(int index){
- return buffer.getChar(index*2);
+ return buffer.getChar(index * RECORD_SIZE);
}
public void setIndex(int index, char value){
- buffer.setChar(index*2, value);
+ buffer.setChar(index * RECORD_SIZE, value);
}
public void allocateNew(int size){
clear();
- buffer = allocator.buffer(size * 2);
+ buffer = allocator.buffer(size * RECORD_SIZE);
}
-
+
public SelectionVector2 clone(){
SelectionVector2 newSV = new SelectionVector2(allocator);
newSV.recordCount = recordCount;
newSV.buffer = buffer;
+
+ /* Since buffer and newSV.buffer essentially point to the
+ * same buffer, if we don't do a retain() on the newSV's
+ * buffer, it might get freed.
+ */
+ newSV.buffer.retain();
clear();
return newSV;
}
@@ -82,6 +114,6 @@ public class SelectionVector2 implements Closeable{
public void close() throws IOException {
clear();
}
-
-
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7e40a05..ca78eb5 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -67,4 +67,7 @@ drill.exec: {
global.max.width: 100,
executor.threads: 4
}
+ trace: {
+ directory: "/var/log/drill"
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
new file mode 100644
index 0000000..ea14b08
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.trace;
+
+import static org.junit.Assert.*;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+/*
+ * This test uses a physical plan with the mock scan that generates 100k records.
+ * Here we inject the "trace" operator in two locations in the plan.
+ *
+ * The goal of this test is to make sure the trace operator works fine across
+ * multiple record batches and when there is a selection vector present in the
+ * incoming container of the trace operator.
+ */
+public class TestTraceMultiRecordBatch {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTraceOutputDump.class);
+ DrillConfig c = DrillConfig.create();
+
+
+ @Test
+ public void testFilter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
+ {
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getConfig(); result = c;
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/multi_record_batch_trace.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ while(exec.next()) {
+ }
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
new file mode 100644
index 0000000..92faf8d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.trace;
+
+import static org.junit.Assert.*;
+
+import io.netty.buffer.ByteBufInputStream;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.FileInputStream;
+
+import io.netty.buffer.ByteBuf;
+
+/*
+ * This test uses a simple physical plan with a mock-scan that
+ * generates one row. The physical plan also consists of the
+ * trace operator which will dump the records as bytes to the
+ * log file.
+ *
+ * Objective of this test is not only to verify if the injected
+ * trace operator dumps the output to the log file, but also
+ * to read the dumped output and verify if it matches what we expect
+ * it to be. Since our scan produces only one record we expect record count to
+ * be one, expect there are no selection vectors and we know the value of
+ * the record that is dumped (Integer.MIN_VALUE) so we compare it with this
+ * known value.
+ */
+public class TestTraceOutputDump {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTraceOutputDump.class);
+ DrillConfig c = DrillConfig.create();
+
+
+ @Test
+ public void testFilter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
+ {
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getConfig(); result = c;
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ while(exec.next()){
+ }
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+
+ FragmentHandle handle = context.getHandle();
+
+ /* Form the file name to which the trace output will dump the record batches */
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String logLocation = c.getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+ System.out.println("Found log location: " + logLocation);
+
+ String filename = new String(logLocation + "/" + "mock-scan" + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
+
+ System.out.println("File Name: " + filename);
+
+ File file = new File(filename);
+
+ if (!file.exists())
+ throw new IOException("Trace file not created");
+
+ FileInputStream input = new FileInputStream(file.getAbsoluteFile());
+ FileChannel fc = input.getChannel();
+ int size = (int) fc.size();
+ BufferAllocator allocator = context.getAllocator();
+ ByteBuffer buffer = ByteBuffer.allocate((int) fc.size());
+ ByteBuf buf = allocator.buffer(size);
+
+ int readSize;
+
+ /* Read the file into a ByteBuffer and transfer it into our ByteBuf */
+ while ((readSize = (fc.read(buffer))) > 0)
+ {
+ buffer.position(0).limit(readSize);
+ buf.writeBytes(buffer);
+ buffer.clear();
+ }
+
+ final ByteBufInputStream is = new ByteBufInputStream(buf, buf.readableBytes());
+
+ RecordBatchDef batchDef = RecordBatchDef.parseDelimitedFrom(is);
+
+ /* Assert there are no selection vectors */
+ assertTrue(!batchDef.getIsSelectionVector2());
+
+ /* Assert there is only one record */
+ assertTrue(batchDef.getRecordCount() == 1);
+
+ /* Read the Integer value and ASSERT its Integer.MIN_VALUE */
+ int value = buf.getInt(buf.readerIndex());
+ assertTrue(value == Integer.MIN_VALUE);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception{
+ // pause to get logger to catch up.
+ Thread.sleep(1000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
index 94d66d4..4c45960 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
@@ -66,7 +66,7 @@ public class TestLoad {
v.getMutator().setValueCount(100);
}
- WritableBatch writableBatch = WritableBatch.getBatchNoSV(100, vectors);
+ WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false);
RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
ByteBuf[] byteBufs = writableBatch.getBuffers();
int bytes = 0;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json b/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json
new file mode 100644
index 0000000..26ddb3d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json
@@ -0,0 +1,49 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 100000, types: [
+ {name: "blue", type: "BIGINT", mode: "REQUIRED"}
+ ]}
+ ]
+ },
+{
+ pop : "trace",
+ @id : 2,
+ tag : "mock-sub-scan",
+ child : 1
+ },
+{
+ pop : "filter",
+ @id : 3,
+ child : 2,
+ expr : " (blue) < (1) "
+ },
+ {
+ pop : "trace",
+ @id : 4,
+ tag : "trace-with-sv",
+ child : 3
+ },
+{
+ pop : "selection-vector-remover",
+ @id : 5,
+ child : 4
+},
+ {
+ @id: 6,
+ child: 5,
+ pop: "screen"
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/resources/trace/simple_trace.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/trace/simple_trace.json b/exec/java-exec/src/test/resources/trace/simple_trace.json
new file mode 100644
index 0000000..8dbea70
--- /dev/null
+++ b/exec/java-exec/src/test/resources/trace/simple_trace.json
@@ -0,0 +1,37 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"mock-sub-scan",
+ url: "http://apache.org",
+ entries:[
+ {records: 1, types: [
+ {name: "red", type: "INT", mode: "REQUIRED"}
+ ]}
+ ]
+ }, {
+ @id:2,
+ child: 1,
+ pop:"project",
+ exprs: [
+ { ref: "col1", expr:"red" }
+ ]
+ }, {
+ pop : "trace",
+ @id : 3,
+ child : 2,
+ tag : "mock-scan"
+ }, {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index faf67cd..f305c00 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2909,6 +2909,16 @@ public final class UserBitShared {
* <code>optional int32 record_count = 2;</code>
*/
int getRecordCount();
+
+ // optional bool isSelectionVector2 = 3;
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ boolean hasIsSelectionVector2();
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ boolean getIsSelectionVector2();
}
/**
* Protobuf type {@code exec.shared.RecordBatchDef}
@@ -2974,6 +2984,11 @@ public final class UserBitShared {
recordCount_ = input.readInt32();
break;
}
+ case 24: {
+ bitField0_ |= 0x00000002;
+ isSelectionVector2_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3069,9 +3084,26 @@ public final class UserBitShared {
return recordCount_;
}
+ // optional bool isSelectionVector2 = 3;
+ public static final int ISSELECTIONVECTOR2_FIELD_NUMBER = 3;
+ private boolean isSelectionVector2_;
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ public boolean hasIsSelectionVector2() {
+ return ((bitField0_ & 0x00000002) == 0x00000002);
+ }
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ public boolean getIsSelectionVector2() {
+ return isSelectionVector2_;
+ }
+
private void initFields() {
field_ = java.util.Collections.emptyList();
recordCount_ = 0;
+ isSelectionVector2_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3091,6 +3123,9 @@ public final class UserBitShared {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeInt32(2, recordCount_);
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ output.writeBool(3, isSelectionVector2_);
+ }
getUnknownFields().writeTo(output);
}
@@ -3108,6 +3143,10 @@ public final class UserBitShared {
size += com.google.protobuf.CodedOutputStream
.computeInt32Size(2, recordCount_);
}
+ if (((bitField0_ & 0x00000002) == 0x00000002)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(3, isSelectionVector2_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3233,6 +3272,8 @@ public final class UserBitShared {
}
recordCount_ = 0;
bitField0_ = (bitField0_ & ~0x00000002);
+ isSelectionVector2_ = false;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -3274,6 +3315,10 @@ public final class UserBitShared {
to_bitField0_ |= 0x00000001;
}
result.recordCount_ = recordCount_;
+ if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+ to_bitField0_ |= 0x00000002;
+ }
+ result.isSelectionVector2_ = isSelectionVector2_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3319,6 +3364,9 @@ public final class UserBitShared {
if (other.hasRecordCount()) {
setRecordCount(other.getRecordCount());
}
+ if (other.hasIsSelectionVector2()) {
+ setIsSelectionVector2(other.getIsSelectionVector2());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3619,6 +3667,39 @@ public final class UserBitShared {
return this;
}
+ // optional bool isSelectionVector2 = 3;
+ private boolean isSelectionVector2_ ;
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ public boolean hasIsSelectionVector2() {
+ return ((bitField0_ & 0x00000004) == 0x00000004);
+ }
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ public boolean getIsSelectionVector2() {
+ return isSelectionVector2_;
+ }
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ public Builder setIsSelectionVector2(boolean value) {
+ bitField0_ |= 0x00000004;
+ isSelectionVector2_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool isSelectionVector2 = 3;</code>
+ */
+ public Builder clearIsSelectionVector2() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ isSelectionVector2_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:exec.shared.RecordBatchDef)
}
@@ -4903,15 +4984,16 @@ public final class UserBitShared {
"ror\030\005 \003(\0132\031.exec.shared.ParsingError\"\\\n\014" +
"ParsingError\022\024\n\014start_column\030\002 \001(\005\022\021\n\tst" +
"art_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(\005\022\017\n\007end" +
- "_row\030\005 \001(\005\"\r\n\013RecordBatch\"Q\n\016RecordBatch",
+ "_row\030\005 \001(\005\"\r\n\013RecordBatch\"m\n\016RecordBatch",
"Def\022)\n\005field\030\001 \003(\0132\032.exec.shared.FieldMe" +
- "tadata\022\024\n\014record_count\030\002 \001(\005\"\261\001\n\rFieldMe" +
- "tadata\022\033\n\003def\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013v" +
- "alue_count\030\002 \001(\005\022\027\n\017var_byte_length\030\003 \001(" +
- "\005\022\023\n\013group_count\030\004 \001(\005\022\025\n\rbuffer_length\030" +
- "\005 \001(\005\022)\n\005child\030\006 \003(\0132\032.exec.shared.Field" +
- "MetadataB.\n\033org.apache.drill.exec.protoB" +
- "\rUserBitSharedH\001"
+ "tadata\022\024\n\014record_count\030\002 \001(\005\022\032\n\022isSelect" +
+ "ionVector2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n\003de" +
+ "f\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_count\030\002" +
+ " \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013group_c" +
+ "ount\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005chi" +
+ "ld\030\006 \003(\0132\032.exec.shared.FieldMetadataB.\n\033" +
+ "org.apache.drill.exec.protoB\rUserBitShar" +
+ "edH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4947,7 +5029,7 @@ public final class UserBitShared {
internal_static_exec_shared_RecordBatchDef_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_exec_shared_RecordBatchDef_descriptor,
- new java.lang.String[] { "Field", "RecordCount", });
+ new java.lang.String[] { "Field", "RecordCount", "IsSelectionVector2", });
internal_static_exec_shared_FieldMetadata_descriptor =
getDescriptor().getMessageTypes().get(5);
internal_static_exec_shared_FieldMetadata_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
new file mode 100644
index 0000000..749b3d5
--- /dev/null
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.proto.helper;
+
+import java.util.UUID;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+/* Helper class around the QueryId protobuf */
+public class QueryIdHelper {
+
+ /* Generate a UUID from the two parts of the queryid */
+ public static String getQueryId(QueryId queryId)
+ {
+ return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 7e2506c..5bea284 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -34,7 +34,8 @@ message RecordBatch{
message RecordBatchDef {
repeated FieldMetadata field = 1;
optional int32 record_count = 2;
-
+ optional bool isSelectionVector2 = 3;
+
}
message FieldMetadata {
[04/10] git commit: DRILL-271 refactor DistributedCache code. Uses
hazel cast 3.1 and custom serialization.
Posted by ja...@apache.org.
DRILL-271 refactor DistributedCache code. Uses hazel cast 3.1 and custom serialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d529352e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d529352e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d529352e
Branch: refs/heads/master
Commit: d529352e03410e1612db691677dc90c855342c01
Parents: 266d248
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Oct 29 20:24:00 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:38 2013 -0700
----------------------------------------------------------------------
exec/java-exec/pom.xml | 2 +-
.../exec/cache/HCDrillSerializableWrapper.java | 63 -------
.../cache/HCSerializableWrapperClasses.java | 31 ----
.../cache/HCVectorAccessibleSerializer.java | 56 ++++++
.../org/apache/drill/exec/cache/HazelCache.java | 37 ++--
.../org/apache/drill/exec/cache/LocalCache.java | 12 +-
.../apache/drill/exec/cache/ProtoBufWrap.java | 8 +-
.../cache/VectorAccessibleSerializable.java | 184 ++++++++++++++++++
.../exec/cache/VectorContainerSerializable.java | 186 -------------------
.../OrderedPartitionRecordBatch.java | 20 +-
.../org/apache/drill/exec/server/Drillbit.java | 6 +-
.../drill/exec/cache/TestVectorCache.java | 8 +-
.../drill/exec/cache/TestWriteToDisk.java | 108 +++++++++++
13 files changed, 403 insertions(+), 318 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 063e60e..c5b169d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -185,7 +185,7 @@
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
- <version>2.5.1</version>
+ <version>3.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
deleted file mode 100644
index 3f2c41c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.hazelcast.nio.DataSerializable;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public abstract class HCDrillSerializableWrapper implements DataSerializable {
-
- private DrillSerializable obj;
-
- public HCDrillSerializableWrapper() {}
-
- public HCDrillSerializableWrapper(DrillSerializable obj) {
- this.obj = obj;
- }
-
- public void readData(DataInput in) throws IOException {
- obj.read(in);
- }
-
- public void writeData(DataOutput out) throws IOException {
- obj.write(out);
- }
-
- public DrillSerializable get() {
- return obj;
- }
-
- /**
- * This is a method that will get a Class specific implementation of HCDrillSerializableWrapper. Class specific implentations
- * are necessary because Hazel Cast requires object that have constructors with no parameters.
- * @param value
- * @param clazz
- * @return
- */
- public static HCDrillSerializableWrapper getWrapper(DrillSerializable value, Class clazz) {
- if (clazz.equals(VectorContainerSerializable.class)) {
- return new HCSerializableWrapperClasses.HCVectorListSerializable(value);
- } else {
- throw new UnsupportedOperationException("HCDrillSerializableWrapper not implemented for " + clazz);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
deleted file mode 100644
index d22723a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-public class HCSerializableWrapperClasses {
- public static class HCVectorListSerializable extends HCDrillSerializableWrapper {
-
- public HCVectorListSerializable() {
- super(new VectorContainerSerializable());
- }
-
- public HCVectorListSerializable(DrillSerializable obj) {
- super(obj);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
new file mode 100644
index 0000000..0d5ba96
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
+
+ private BufferAllocator allocator;
+
+ public HCVectorAccessibleSerializer(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
+ VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+ va.readFromStream(DataInputInputStream.constructInputStream(in));
+ return va;
+ }
+
+ public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
+ va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
+ }
+
+ public void destroy() {}
+
+ public int getTypeId() {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 577dfeb..9dd4373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -22,12 +22,15 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.hazelcast.config.SerializerConfig;
import com.hazelcast.core.*;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
@@ -35,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.hazelcast.config.Config;
+import org.apache.drill.exec.server.DrillbitContext;
public class HazelCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -44,9 +48,11 @@ public class HazelCache implements DistributedCache {
private ITopic<HWorkQueueStatus> workQueueLengths;
private HandlePlan fragments;
private Cache<WorkQueueStatus, Integer> endpoints;
-
- public HazelCache(DrillConfig config) {
+ private BufferAllocator allocator;
+
+ public HazelCache(DrillConfig config, BufferAllocator allocator) {
this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+ this.allocator = allocator;
}
private class Listener implements MessageListener<HWorkQueueStatus>{
@@ -61,7 +67,10 @@ public class HazelCache implements DistributedCache {
public void run() {
Config c = new Config();
+ SerializerConfig sc = new SerializerConfig().setImplementation(new HCVectorAccessibleSerializer(allocator))
+ .setTypeClass(VectorAccessibleSerializable.class);
c.setInstanceName(instanceName);
+ c.getSerializationConfig().addSerializerConfig(sc);
instance = getInstanceOrCreateNew(c);
workQueueLengths = instance.getTopic("queue-length");
fragments = new HandlePlan(instance);
@@ -120,11 +129,11 @@ public class HazelCache implements DistributedCache {
@Override
public Counter getCounter(String name) {
- return new HCCounterImpl(this.instance.getAtomicNumber(name));
+ return new HCCounterImpl(this.instance.getAtomicLong(name));
}
public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
- private IMap<String, HCDrillSerializableWrapper> m;
+ private IMap<String, DrillSerializable> m;
private Class<V> clazz;
public HCDistributedMapImpl(IMap m, Class<V> clazz) {
@@ -133,24 +142,24 @@ public class HazelCache implements DistributedCache {
}
public DrillSerializable get(String key) {
- return m.get(key).get();
+ return m.get(key);
}
public void put(String key, DrillSerializable value) {
- m.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ m.put(key, value);
}
public void putIfAbsent(String key, DrillSerializable value) {
- m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ m.putIfAbsent(key, value);
}
public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
- m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz), ttl, timeunit);
+ m.putIfAbsent(key, value, ttl, timeunit);
}
}
public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
- private com.hazelcast.core.MultiMap<String, HCDrillSerializableWrapper> mmap;
+ private com.hazelcast.core.MultiMap<String, DrillSerializable> mmap;
private Class<V> clazz;
public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
@@ -160,22 +169,22 @@ public class HazelCache implements DistributedCache {
public Collection<DrillSerializable> get(String key) {
List<DrillSerializable> list = Lists.newArrayList();
- for (HCDrillSerializableWrapper v : mmap.get(key)) {
- list.add(v.get());
+ for (DrillSerializable v : mmap.get(key)) {
+ list.add(v);
}
return list;
}
@Override
public void put(String key, DrillSerializable value) {
- mmap.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+ mmap.put(key, value);
}
}
public static class HCCounterImpl implements Counter {
- private AtomicNumber n;
+ private IAtomicLong n;
- public HCCounterImpl(AtomicNumber n) {
+ public HCCounterImpl(IAtomicLong n) {
this.n = n;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 7ad6ec6..e6275c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.cache;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -31,11 +32,15 @@ import com.google.common.collect.Lists;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
import com.google.common.collect.Maps;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.DrillbitContext;
public class LocalCache implements DistributedCache {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
@@ -44,7 +49,8 @@ public class LocalCache implements DistributedCache {
private volatile ConcurrentMap<Class, DistributedMap> maps;
private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
private volatile ConcurrentMap<String, Counter> counters;
-
+ private static final BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+
@Override
public void close() throws IOException {
handles = null;
@@ -116,10 +122,10 @@ public class LocalCache implements DistributedCache {
public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
try {
- DrillSerializable obj = (DrillSerializable)clazz.newInstance();
+ DrillSerializable obj = (DrillSerializable)clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
obj.read(in);
return obj;
- } catch (InstantiationException | IllegalAccessException | IOException e) {
+ } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
index 4aea645..448eecd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -21,11 +21,13 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
-import com.hazelcast.nio.DataSerializable;
+import com.hazelcast.nio.serialization.DataSerializable;
public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
@@ -43,7 +45,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
}
@Override
- public void readData(DataInput arg0) throws IOException {
+ public void readData(ObjectDataInput arg0) throws IOException {
int len = arg0.readShort();
byte[] b = new byte[len];
arg0.readFully(b);
@@ -51,7 +53,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
}
@Override
- public void writeData(DataOutput arg0) throws IOException {
+ public void writeData(ObjectDataOutput arg0) throws IOException {
byte[] b = value.toByteArray();
if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
arg0.writeShort(b.length);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
new file mode 100644
index 0000000..24387d8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.google.common.collect.Lists;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+
+import java.io.*;
+import java.util.List;
+
+public class VectorAccessibleSerializable implements DrillSerializable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
+ static final MetricRegistry metrics = DrillMetrics.getInstance();
+ static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
+
+ private VectorAccessible va;
+ private BufferAllocator allocator;
+ private int recordCount = -1;
+ private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
+ private SelectionVector2 sv2;
+
+ /**
+ *
+ * @param va
+ */
+ public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
+ this.va = va;
+ this.allocator = allocator;
+ }
+
+ public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
+ this.va = va;
+ this.allocator = allocator;
+ this.sv2 = sv2;
+ if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
+
+ public VectorAccessibleSerializable(BufferAllocator allocator) {
+ this.va = new VectorContainer();
+ this.allocator = allocator;
+ }
+
+ @Override
+ public void read(DataInput input) throws IOException {
+ readFromStream(DataInputInputStream.constructInputStream(input));
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ VectorContainer container = new VectorContainer();
+ UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+ recordCount = batchDef.getRecordCount();
+ if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+ sv2.allocateNew(recordCount * 2);
+ sv2.getBuffer().setBytes(0, input, recordCount * 2);
+ svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
+ List<ValueVector> vectorList = Lists.newArrayList();
+ List<FieldMetadata> fieldList = batchDef.getFieldList();
+ for (FieldMetadata metaData : fieldList) {
+ int dataLength = metaData.getBufferLength();
+ byte[] bytes = new byte[dataLength];
+ input.read(bytes);
+ MaterializedField field = MaterializedField.create(metaData.getDef());
+ ByteBuf buf = allocator.buffer(dataLength);
+ buf.setBytes(0, bytes);
+ ValueVector vector = TypeHelper.getNewVector(field, allocator);
+ vector.load(metaData, buf);
+ vectorList.add(vector);
+ }
+ container.addCollection(vectorList);
+ container.buildSchema(svMode);
+ container.setRecordCount(recordCount);
+ va = container;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ writeToStream(DataOutputOutputStream.constructOutputStream(output));
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
+
+ ByteBuf[] incomingBuffers = batch.getBuffers();
+ UserBitShared.RecordBatchDef batchDef = batch.getDef();
+
+ /* ByteBuf associated with the selection vector */
+ ByteBuf svBuf = null;
+
+ /* Size of the selection vector */
+ int svCount = 0;
+
+ if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+ {
+ svCount = sv2.getCount();
+ svBuf = sv2.getBuffer();
+ }
+
+ int totalBufferLength = 0;
+
+ try
+ {
+ /* Write the metadata to the file */
+ batchDef.writeDelimitedTo(output);
+
+ /* If we have a selection vector, dump it to file first */
+ if (svBuf != null)
+ {
+
+ /* For writing to the selection vectors we use
+ * setChar() method which does not modify the
+ * reader and writer index. To copy the entire buffer
+ * without having to get each byte individually we need
+ * to set the writer index
+ */
+ svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
+
+// fc.write(svBuf.nioBuffers());
+ svBuf.getBytes(0, output, svBuf.readableBytes());
+ svBuf.release();
+ }
+
+ /* Dump the array of ByteBuf's associated with the value vectors */
+ for (ByteBuf buf : incomingBuffers)
+ {
+ /* dump the buffer into the file channel */
+ int bufLength = buf.readableBytes();
+ buf.getBytes(0, output, bufLength);
+
+ /* compute total length of buffer, will be used when
+ * we create a compound buffer
+ */
+ totalBufferLength += buf.readableBytes();
+ buf.release();
+ }
+
+ output.flush();
+ context.stop();
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ } finally {
+ clear();
+ }
+ }
+
+ private void clear() {
+ }
+
+ public VectorAccessible get() {
+ return va;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
deleted file mode 100644
index 5813dd6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.google.common.collect.Lists;
-import com.yammer.metrics.MetricRegistry;
-import com.yammer.metrics.Timer;
-import io.netty.buffer.ByteBuf;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-
-import java.io.*;
-import java.util.List;
-
-public class VectorContainerSerializable implements DrillSerializable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
- static final MetricRegistry metrics = DrillMetrics.getInstance();
- static final String WRITER_TIMER = MetricRegistry.name(VectorContainerSerializable.class, "writerTime");
-
- private VectorAccessible va;
- private BootStrapContext context;
- private int recordCount = -1;
- private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
- private SelectionVector2 sv2;
-
- /**
- *
- * @param va
- */
- public VectorContainerSerializable(VectorAccessible va){
- this.va = va;
- this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
- }
-
- public VectorContainerSerializable(VectorAccessible va, SelectionVector2 sv2, FragmentContext context) {
- this.va = va;
- this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
- this.sv2 = sv2;
- if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
- }
-
- public VectorContainerSerializable() {
- this.va = new VectorContainer();
- this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
- }
-
- @Override
- public void read(DataInput input) throws IOException {
- readFromStream(DataInputInputStream.constructInputStream(input));
- }
-
- @Override
- public void readFromStream(InputStream input) throws IOException {
- VectorContainer container = new VectorContainer();
- UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
- recordCount = batchDef.getRecordCount();
- if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
- sv2.allocateNew(recordCount * 2);
- sv2.getBuffer().setBytes(0, input, recordCount * 2);
- svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
- }
- List<ValueVector> vectorList = Lists.newArrayList();
- List<FieldMetadata> fieldList = batchDef.getFieldList();
- for (FieldMetadata metaData : fieldList) {
- int dataLength = metaData.getBufferLength();
- byte[] bytes = new byte[dataLength];
- input.read(bytes);
- MaterializedField field = MaterializedField.create(metaData.getDef());
- ByteBuf buf = context.getAllocator().buffer(dataLength);
- buf.setBytes(0, bytes);
- ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
- vector.load(metaData, buf);
- vectorList.add(vector);
- }
- container.addCollection(vectorList);
- container.buildSchema(svMode);
- container.setRecordCount(recordCount);
- va = container;
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- writeToStream(DataOutputOutputStream.constructOutputStream(output));
- }
-
- @Override
- public void writeToStream(OutputStream output) throws IOException {
- final Timer.Context context = metrics.timer(WRITER_TIMER).time();
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
-
- ByteBuf[] incomingBuffers = batch.getBuffers();
- UserBitShared.RecordBatchDef batchDef = batch.getDef();
-
- /* ByteBuf associated with the selection vector */
- ByteBuf svBuf = null;
-
- /* Size of the selection vector */
- int svCount = 0;
-
- if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
- {
- svCount = sv2.getCount();
- svBuf = sv2.getBuffer();
- }
-
- int totalBufferLength = 0;
-
- try
- {
- /* Write the metadata to the file */
- batchDef.writeDelimitedTo(output);
-
- /* If we have a selection vector, dump it to file first */
- if (svBuf != null)
- {
-
- /* For writing to the selection vectors we use
- * setChar() method which does not modify the
- * reader and writer index. To copy the entire buffer
- * without having to get each byte individually we need
- * to set the writer index
- */
- svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-// fc.write(svBuf.nioBuffers());
- svBuf.getBytes(0, output, svBuf.readableBytes());
- svBuf.release();
- }
-
- /* Dump the array of ByteBuf's associated with the value vectors */
- for (ByteBuf buf : incomingBuffers)
- {
- /* dump the buffer into the file channel */
- int bufLength = buf.readableBytes();
- buf.getBytes(0, output, bufLength);
-
- /* compute total length of buffer, will be used when
- * we create a compound buffer
- */
- totalBufferLength += buf.readableBytes();
- buf.release();
- }
-
- output.flush();
- context.stop();
- } catch (IOException e)
- {
- throw new RuntimeException(e);
- } finally {
- clear();
- }
- }
-
- private void clear() {
- }
-
- public VectorAccessible get() {
- return va;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index ef3886c..e39b82e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -82,7 +82,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
private boolean startedUnsampledBatches = false;
private boolean upstreamNone = false;
private int recordCount;
- private DistributedMap<VectorContainerSerializable> tableMap;
+ private DistributedMap<VectorAccessibleSerializable> tableMap;
private DistributedMultiMap mmap;
private String mapKey;
@@ -154,13 +154,13 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
DistributedCache cache = context.getDrillbitContext().getCache();
mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
- mmap = cache.getMultiMap(VectorContainerSerializable.class);
+ mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
List<ValueVector> vectorList = Lists.newArrayList();
for (VectorWrapper vw : containerToCache) {
vectorList.add(vw.getValueVector());
}
- VectorContainerSerializable wrap = new VectorContainerSerializable(containerToCache);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
mmap.put(mapKey, wrap);
wrap = null;
@@ -169,24 +169,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
long val = minorFragmentSampleCount.incrementAndGet();
logger.debug("Incremented mfsc, got {}", val);
- tableMap = cache.getMap(VectorContainerSerializable.class);
+ tableMap = cache.getMap(VectorAccessibleSerializable.class);
Preconditions.checkNotNull(tableMap);
if (val == Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
buildTable();
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
} else if (val < Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
// Wait until sufficient number of fragments have submitted samples, or proceed after 100 ms passed
for (int i = 0; i < 100 && wrap == null; i++) {
Thread.sleep(10);
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
if (i == 99) {
buildTable();
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
}
}
} else {
- wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+ wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
}
Preconditions.checkState(wrap != null);
@@ -211,7 +211,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer);
for (DrillSerializable w : allSamplesWrap) {
- containerBuilder.add(((VectorContainerSerializable)w).get());
+ containerBuilder.add(((VectorAccessibleSerializable)w).get());
}
containerBuilder.build(context);
@@ -239,7 +239,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
- VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 052f415..49732d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -70,7 +70,7 @@ public class Drillbit implements Closeable{
final DistributedCache cache;
final WorkManager manager;
final BootStrapContext context;
-
+
private volatile RegistrationHandle handle;
public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
@@ -86,15 +86,15 @@ public class Drillbit implements Closeable{
this.manager = new WorkManager(context);
this.coord = new ZKClusterCoordinator(config);
this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
- this.cache = new HazelCache(config);
+ this.cache = new HazelCache(config, context.getAllocator());
}
}
public void run() throws Exception {
coord.start(10000);
DrillbitEndpoint md = engine.start();
- cache.run();
manager.start(md, cache, engine.getBitCom(), coord);
+ cache.run();
handle = coord.register(md);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index ffc0274..94aa3dd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -46,7 +46,7 @@ public class TestVectorCache {
Drillbit bit = new Drillbit(config, serviceSet);
bit.run();
DrillbitContext context = bit.getContext();
- HazelCache cache = new HazelCache(config);
+ HazelCache cache = new HazelCache(config, context.getAllocator());
cache.run();
MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
@@ -68,11 +68,11 @@ public class TestVectorCache {
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
- VectorContainerSerializable wrap = new VectorContainerSerializable(container);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
- DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
+ DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
mmap.put("vectors", wrap);
- VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
+ VectorAccessibleSerializable newWrap = (VectorAccessibleSerializable)mmap.get("vectors").iterator().next();
VectorAccessible newContainer = newWrap.get();
for (VectorWrapper w : newContainer) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
new file mode 100644
index 0000000..11d15d8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestWriteToDisk {
+
+ @Test
+ public void test() throws Exception {
+ List<ValueVector> vectorList = Lists.newArrayList();
+ RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ DrillConfig config = DrillConfig.create();
+ Drillbit bit = new Drillbit(config, serviceSet);
+ bit.run();
+ DrillbitContext context = bit.getContext();
+
+ MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
+ IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator());
+ MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY));
+ VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
+ AllocationHelper.allocate(intVector, 4, 4);
+ AllocationHelper.allocate(binVector, 4, 5);
+ vectorList.add(intVector);
+ vectorList.add(binVector);
+
+ intVector.getMutator().set(0, 0); binVector.getMutator().set(0, "ZERO".getBytes());
+ intVector.getMutator().set(1, 1); binVector.getMutator().set(1, "ONE".getBytes());
+ intVector.getMutator().set(2, 2); binVector.getMutator().set(2, "TWO".getBytes());
+ intVector.getMutator().set(3, 3); binVector.getMutator().set(3, "THREE".getBytes());
+ intVector.getMutator().setValueCount(4);
+ binVector.getMutator().setValueCount(4);
+
+ VectorContainer container = new VectorContainer();
+ container.addCollection(vectorList);
+ container.setRecordCount(4);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", "file:///");
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path("/tmp/drillSerializable");
+ if (fs.exists(path)) fs.delete(path, false);
+ FSDataOutputStream out = fs.create(path);
+
+ wrap.writeToStream(out);
+ out.close();
+
+ FSDataInputStream in = fs.open(path);
+ VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(context.getAllocator());
+ newWrap.readFromStream(in);
+ fs.close();
+
+ VectorAccessible newContainer = newWrap.get();
+ for (VectorWrapper w : newContainer) {
+ ValueVector vv = w.getValueVector();
+ int values = vv.getAccessor().getValueCount();
+ for (int i = 0; i < values; i++) {
+ Object o = vv.getAccessor().getObject(i);
+ if (o instanceof byte[]) {
+ System.out.println(new String((byte[])o));
+ } else {
+ System.out.println(o);
+ }
+ }
+ }
+ }
+}
[08/10] git commit: Simple reformatting
Posted by ja...@apache.org.
Simple reformatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8a571b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8a571b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8a571b3e
Branch: refs/heads/master
Commit: 8a571b3ea04f439b3303d7154b1450abcb2bf320
Parents: b44b6c7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri Nov 8 09:20:28 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 09:20:28 2013 -0800
----------------------------------------------------------------------
.../physical/impl/trace/TraceBatchCreator.java | 16 +-
.../physical/impl/trace/TraceRecordBatch.java | 224 +++++++++----------
.../apache/drill/exec/record/WritableBatch.java | 52 +++--
3 files changed, 138 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index e857c25..a24ec70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -29,13 +29,13 @@ import com.google.common.base.Preconditions;
import java.util.List;
public class TraceBatchCreator implements BatchCreator<Trace> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
-
- @Override
- public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
- //Preconditions.checkArgument(children.size() == 1);
- return new TraceRecordBatch(config, children.iterator().next(), context);
- }
-
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ // Preconditions.checkArgument(children.size() == 1);
+ return new TraceRecordBatch(config, children.iterator().next(), context);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index b73ddc1..1b990c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -65,139 +65,125 @@ import org.apache.hadoop.fs.Path;
* same set of value vectors (and selection vectors) to its parent record
* batch
*/
-public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
-{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
-
- private SelectionVector2 sv = null;
-
- /* Tag associated with each trace operator */
- final String traceTag;
-
- /* Location where the log should be dumped */
- private final String logLocation;
-
- /* File descriptors needed to be able to dump to log file */
- private OutputStream fos;
-
- public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
- {
- super(pop, context, incoming);
- this.traceTag = pop.traceTag;
- logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
-
- String fileName = getFileName();
-
- /* Create the log file we will dump to and initialize the file descriptors */
- try
- {
- Configuration conf = new Configuration();
- conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
- FileSystem fs = FileSystem.get(conf);
-
- /* create the file */
- fos = fs.create(new Path(fileName));
- } catch (IOException e)
- {
- logger.error("Unable to create file: " + fileName);
- }
- }
+public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+
+ private SelectionVector2 sv = null;
+
+ /* Tag associated with each trace operator */
+ final String traceTag;
+
+ /* Location where the log should be dumped */
+ private final String logLocation;
+
+ /* File descriptors needed to be able to dump to log file */
+ private OutputStream fos;
+
+ public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) {
+ super(pop, context, incoming);
+ this.traceTag = pop.traceTag;
+ logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+ String fileName = getFileName();
+
+ /* Create the log file we will dump to and initialize the file descriptors */
+ try {
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
+ FileSystem fs = FileSystem.get(conf);
- @Override
- public int getRecordCount()
- {
- if (sv == null)
- return incoming.getRecordCount();
- else
- return sv.getCount();
+ /* create the file */
+ fos = fs.create(new Path(fileName));
+ } catch (IOException e) {
+ logger.error("Unable to create file: " + fileName);
}
+ }
+
+ @Override
+ public int getRecordCount() {
+ if (sv == null)
+ return incoming.getRecordCount();
+ else
+ return sv.getCount();
+ }
+
+ /**
+ * Function is invoked for every record batch and it simply dumps the buffers associated with all the value vectors in
+ * this record batch to a log file.
+ */
+ @Override
+ protected void doWork() {
+
+ boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+ if (incomingHasSv2) {
+ sv = incoming.getSelectionVector2();
+ } else {
+ sv = null;
+ }
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true
+ : false);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
+
+ try {
+ wrap.writeToStreamAndRetain(fos);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ batch.reconstructContainer(container);
+ }
- /**
- * Function is invoked for every record batch and it simply
- * dumps the buffers associated with all the value vectors in
- * this record batch to a log file.
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException {
+ /* Trace operator does not deal with hyper vectors yet */
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+ /*
+ * we have a new schema, clear our existing container to load the new value vectors
*/
- @Override
- protected void doWork()
- {
-
- boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
- if (incomingHasSv2) {
- sv = incoming.getSelectionVector2();
- } else {
- sv = null;
- }
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
- incoming, incomingHasSv2 ? true : false);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
-
- try {
- wrap.writeToStreamAndRetain(fos);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- batch.reconstructContainer(container);
- }
+ container.clear();
- @Override
- protected void setupNewSchema() throws SchemaChangeException
- {
- /* Trace operator does not deal with hyper vectors yet */
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
- throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
- /* we have a new schema, clear our existing container to
- * load the new value vectors
- */
- container.clear();
-
- /* Add all the value vectors in the container */
- for(VectorWrapper<?> vv : incoming)
- {
- TransferPair tp = vv.getValueVector().getTransferPair();
- container.add(tp.getTo());
- }
+ /* Add all the value vectors in the container */
+ for (VectorWrapper<?> vv : incoming) {
+ TransferPair tp = vv.getValueVector().getTransferPair();
+ container.add(tp.getTo());
}
+ }
- @Override
- public SelectionVector2 getSelectionVector2() {
- return sv;
- }
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv;
+ }
- private String getFileName()
- {
- /* From the context, get the query id, major fragment id,
- * minor fragment id. This will be used as the file name
- * to which we will dump the incoming buffer data
- */
- FragmentHandle handle = incoming.getContext().getHandle();
+ private String getFileName() {
+ /*
+ * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
+ * which we will dump the incoming buffer data
+ */
+ FragmentHandle handle = incoming.getContext().getHandle();
- String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
- int majorFragmentId = handle.getMajorFragmentId();
- int minorFragmentId = handle.getMinorFragmentId();
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
- String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+ String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
- return fileName;
- }
+ return fileName;
+ }
+ @Override
+ protected void cleanup() {
+ /* Release the selection vector */
+ if (sv != null)
+ sv.clear();
- @Override
- protected void cleanup()
- {
- /* Release the selection vector */
- if (sv != null)
- sv.clear();
-
- /* Close the file descriptors */
- try
- {
- fos.close();
- } catch (IOException e)
- {
- logger.error("Unable to close file descriptors for file: " + getFileName());
- }
+ /* Close the file descriptors */
+ try {
+ fos.close();
+ } catch (IOException e) {
+ logger.error("Unable to close file descriptors for file: " + getFileName());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index a33ca37..e9b56db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -60,17 +60,15 @@ public class WritableBatch {
return buffers;
}
- public void reconstructContainer(VectorContainer container)
- {
- Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
- if (buffers.length > 0) /* If we have ByteBuf's associated with value vectors */
- {
-
+ public void reconstructContainer(VectorContainer container) {
+ Preconditions.checkState(!cleared,
+ "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+ if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
+
CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
- /* Copy data from each buffer into the compound buffer */
- for (ByteBuf buf : buffers)
- {
+ /* Copy data from each buffer into the compound buffer */
+ for (ByteBuf buf : buffers) {
cbb.addComponent(buf);
}
@@ -78,13 +76,12 @@ public class WritableBatch {
int bufferOffset = 0;
- /* For each value vector slice up the appropriate size from
- * the compound buffer and load it into the value vector
- */
+ /*
+ * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
+ */
int vectorIndex = 0;
- for(VectorWrapper<?> vv : container)
- {
+ for (VectorWrapper<?> vv : container) {
FieldMetadata fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -101,9 +98,8 @@ public class WritableBatch {
}
container.buildSchema(svMode);
- /* Set the record count in the value vector */
- for(VectorWrapper<?> v : container)
- {
+ /* Set the record count in the value vector */
+ for (VectorWrapper<?> v : container) {
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(def.getRecordCount());
}
@@ -118,23 +114,24 @@ public class WritableBatch {
public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
List<ValueVector> vectors = Lists.newArrayList();
- for(VectorWrapper<?> vw : vws){
+ for (VectorWrapper<?> vw : vws) {
Preconditions.checkArgument(!vw.isHyper());
vectors.add(vw.getValueVector());
}
return getBatchNoHV(recordCount, vectors, isSV2);
}
-
+
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
-
- // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
- if(recordCount == 0) continue;
-
+
+ // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
+ if (recordCount == 0)
+ continue;
+
for (ByteBuf b : vv.getBuffers()) {
buffers.add(b);
}
@@ -142,14 +139,15 @@ public class WritableBatch {
vv.clear();
}
- RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).setIsSelectionVector2(isSV2).build();
+ RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
+ .setIsSelectionVector2(isSV2).build();
WritableBatch b = new WritableBatch(batchDef, buffers);
return b;
}
-
+
public static WritableBatch get(RecordBatch batch) {
- if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
- throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
+ if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);
[09/10] git commit: Drill-278: fix for Join predicate with CHAR types
hit run-time error
Posted by ja...@apache.org.
Drill-278: fix for Join predicate with CHAR types hit run-time error
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c5916653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c5916653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c5916653
Branch: refs/heads/master
Commit: c591665343b7640d82397f5668f98b54c9e789a9
Parents: 8a571b3
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Fri Nov 8 09:23:22 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 09:23:22 2013 -0800
----------------------------------------------------------------------
.../drill/exec/physical/impl/join/MergeJoinBatch.java | 9 ++-------
1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5916653/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1f5c707..1e20e91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -340,13 +340,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
// check value equality
- cg.getEvalBlock()._if(compareThisLeftExprHolder.getValue().eq(compareNextLeftExprHolder.getValue()))
- ._then()
- ._return(JExpr.lit(0));
-
- // no match if reached
- cg.getEvalBlock()._return(JExpr.lit(1));
-
+ FunctionCall g = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of((LogicalExpression) new HoldingContainerExpression(compareThisLeftExprHolder), (LogicalExpression) new HoldingContainerExpression(compareNextLeftExprHolder)), ExpressionPosition.UNKNOWN);
+ cg.addExpr(new ReturnValueExpression(g, false), false);
// generate copyLeft()
//////////////////////
[10/10] git commit: DRILL-266:Build tools to interpret the output
dumped by the diagnostic operator.
Posted by ja...@apache.org.
DRILL-266:Build tools to interpret the output dumped by the diagnostic operator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c287fa60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c287fa60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c287fa60
Branch: refs/heads/master
Commit: c287fa604f6206752200d72359d3293c87078010
Parents: c591665
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Fri Nov 8 16:50:45 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 16:50:45 2013 -0800
----------------------------------------------------------------------
distribution/src/assemble/bin.xml | 4 +
distribution/src/resources/drill_dumpcat | 56 ++++
.../cache/VectorAccessibleSerializable.java | 3 +-
.../org/apache/drill/exec/client/DumpCat.java | 292 +++++++++++++++++++
.../drill/exec/client/QuerySubmitter.java | 33 +--
.../org/apache/drill/exec/util/VectorUtil.java | 68 +++++
.../apache/drill/exec/client/DumpCatTest.java | 133 +++++++++
7 files changed, 561 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 26de847..fdd6c70 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -110,6 +110,10 @@
<outputDirectory>bin</outputDirectory>
</file>
<file>
+ <source>src/resources/drill_dumpcat</source>
+ <outputDirectory>bin</outputDirectory>
+ </file>
+ <file>
<source>src/resources/drill-override.conf</source>
<outputDirectory>conf</outputDirectory>
</file>
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/distribution/src/resources/drill_dumpcat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill_dumpcat b/distribution/src/resources/drill_dumpcat
new file mode 100755
index 0000000..1747c9a
--- /dev/null
+++ b/distribution/src/resources/drill_dumpcat
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+. "$bin"/drill-config.sh
+
+if [ -z $JAVA_HOME ]
+then
+ JAVA=`which java`
+else
+ JAVA=`find -L $JAVA_HOME -name java | head -n 1`
+fi
+
+if [ -e $JAVA ]; then
+ echo ""
+else
+ echo "Java not found."
+ exit 1
+fi
+
+$JAVA -version 2>&1 | grep "version" | egrep -e "1.7" > /dev/null
+if [ $? -ne 0 ]; then
+ echo "Java 1.7 is required to run Apache Drill."
+ exit 1
+fi
+
+# get log directory
+if [ "$DRILL_LOG_DIR" = "" ]; then
+ export DRILL_LOG_DIR=/var/log/drill
+fi
+
+CP=$DRILL_HOME/jars/*:$CP
+CP=$DRILL_HOME/lib/*:$CP
+
+CP=$DRILL_CONF_DIR:$CP
+CP=$HADOOP_CLASSPATH:$CP
+
+DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/drill_dumpcat.log"
+
+exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.DumpCat $@
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index e5bb94b..7b4bc23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -98,6 +98,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+
if (sv2 == null) {
sv2 = new SelectionVector2(allocator);
}
@@ -197,7 +198,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
public VectorAccessible get() {
return va;
}
-
+
public SelectionVector2 getSv2() {
return sv2;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
new file mode 100644
index 0000000..ef0b1e1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.util.VectorUtil;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.internal.Lists;
+
+public class DumpCat {
+
+ private final static BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+
+ public static void main(String args[]) throws Exception {
+ DumpCat dumpCat = new DumpCat();
+
+ Options o = new Options();
+ JCommander jc = null;
+ try {
+ jc = new JCommander(o, args);
+ jc.setProgramName("./drill_dumpcat");
+ } catch (ParameterException e) {
+ System.out.println(e.getMessage());
+ String[] valid = {"-f", "file"};
+ new JCommander(o, valid).usage();
+ jc.usage();
+ System.exit(-1);
+ }
+ if (o.help) {
+ jc.usage();
+ System.exit(0);
+ }
+
+ /*Check if dump file exists*/
+ File file = new File(o.location);
+ if (!file.exists()) {
+ System.out.println(String.format("Trace file %s not created", o.location));
+ System.exit(-1);
+ }
+
+ FileInputStream input = new FileInputStream(file.getAbsoluteFile());
+
+ if (o.batch < 0) {
+ dumpCat.doQuery(input);
+ } else {
+ dumpCat.doBatch(input, o.batch, o.include_headers);
+ }
+
+ input.close();
+ }
+
+ /**
+ * Used to ensure the param "batch" is a non-negative number.
+ */
+ public static class BatchNumValidator implements IParameterValidator {
+ @Override
+ public void validate(String name, String value) throws ParameterException {
+ try {
+ int batch = Integer.parseInt(value);
+ if(batch < 0) {
+ throw new ParameterException("Parameter " + name + " should be non-negative number.");
+ }
+ } catch (NumberFormatException e) {
+ throw new ParameterException("Parameter " + name + " should be non-negative number.");
+ }
+
+ }
+ }
+
+ /**
+ * Options as input to JCommander.
+ */
+ static class Options {
+ @Parameter(names = {"-f"}, description = "file containing dump", required=true)
+ public String location = null;
+
+ @Parameter(names = {"-batch"}, description = "id of batch to show", required=false, validateWith = BatchNumValidator.class)
+ public int batch = -1;
+
+ @Parameter(names = {"-include-headers"}, description = "whether include header of batch", required=false)
+ public boolean include_headers = false;
+
+ @Parameter(names = {"-h", "-help", "--help"}, description = "show usage", help=true)
+ public boolean help = false;
+ }
+
+ /**
+ * Contains : # of rows, # of selected rows, data size (byte #).
+ */
+ private class BatchMetaInfo {
+ private long rows = 0;
+ private long selectedRows = 0;
+ private long dataSize = 0;
+
+ public BatchMetaInfo () {
+ }
+
+ public BatchMetaInfo (long rows, long selectedRows, long dataSize) {
+ this.rows = rows;
+ this.selectedRows = selectedRows;
+ this.dataSize = dataSize;
+ }
+
+ public void add(BatchMetaInfo info2) {
+ this.rows += info2.rows;
+ this.selectedRows += info2.selectedRows;
+ this.dataSize += info2.dataSize;
+ }
+
+ public String toString() {
+ String avgRecSizeStr = null;
+ if (this.rows>0)
+ avgRecSizeStr = String.format("Average Record Size : %d ", this.dataSize/this.rows);
+ else
+ avgRecSizeStr = "Average Record Size : 0";
+
+ return String.format("Records : %d / %d \n", this.selectedRows, this.rows) +
+ avgRecSizeStr +
+ String.format("\n Total Data Size : %d", this.dataSize);
+ }
+ }
+
+ /**
+ * Querymode:
+ * $drill-dumpcat --file=local:///tmp/drilltrace/[queryid]_[tag]_[majorid]_[minor]_[operator]
+ * Batches: 135
+ * Records: 53,214/53,214 // the first one is the selected records. The second number is the total number of records.
+ * Selected Records: 53,214
+ * Average Record Size: 74 bytes
+ * Total Data Size: 12,345 bytes
+ * Number of Empty Batches: 1
+ * Schema changes: 1
+ * Schema change batch indices: 0
+ * @throws Exception
+ */
+ protected void doQuery(FileInputStream input) throws Exception{
+ int batchNum = 0;
+ int emptyBatchNum = 0;
+ BatchSchema prevSchema = null;
+ List<Integer> schemaChangeIdx = Lists.newArrayList();
+
+ BatchMetaInfo aggBatchMetaInfo = new BatchMetaInfo();
+
+ while (input.available() > 0) {
+ VectorAccessibleSerializable vcSerializable = new VectorAccessibleSerializable(DumpCat.allocator);
+ vcSerializable.readFromStream(input);
+ VectorContainer vectorContainer = (VectorContainer) vcSerializable.get();
+
+ aggBatchMetaInfo.add(getBatchMetaInfo(vcSerializable));
+
+ if (vectorContainer.getRecordCount() == 0) {
+ emptyBatchNum ++;
+ }
+
+ if (prevSchema != null && !vectorContainer.getSchema().equals(prevSchema))
+ schemaChangeIdx.add(batchNum);
+
+ prevSchema = vectorContainer.getSchema();
+ batchNum ++;
+
+ vectorContainer.zeroVectors();
+ }
+
+ /* output the summary stat */
+ System.out.println(String.format("Total # of batches: %d", batchNum));
+ //output: rows, selectedRows, avg rec size, total data size.
+ System.out.println(aggBatchMetaInfo.toString());
+ System.out.println(String.format("Empty batch : %d", emptyBatchNum));
+ System.out.println(String.format("Schema changes : %d", schemaChangeIdx.size()));
+ System.out.println(String.format("Schema change batch index : %s", schemaChangeIdx.toString()));
+ }
+
+ /**
+ * Batch mode:
+ * $drill-dumpcat --file=local:///tmp/drilltrace/[queryid]_[tag]_[majorid]_[minor]_[operator] --batch=123 --include-headers=true
+ * Records: 1/1
+ * Average Record Size: 8 bytes
+ * Total Data Size: 8 bytes
+ * Schema Information
+ * name: col1, minor_type: int4, data_mode: nullable
+ * name: col2, minor_type: int4, data_mode: non-nullable
+ * @param targetBatchNum
+ * @throws Exception
+ */
+ protected void doBatch(FileInputStream input, int targetBatchNum, boolean showHeader) throws Exception {
+ int batchNum = -1;
+
+ VectorAccessibleSerializable vcSerializable = null;
+
+ while (input.available() > 0 && batchNum ++ < targetBatchNum) {
+ vcSerializable = new VectorAccessibleSerializable(DumpCat.allocator);
+ vcSerializable.readFromStream(input);
+
+ if (batchNum != targetBatchNum) {
+ VectorContainer vectorContainer = (VectorContainer) vcSerializable.get();
+ vectorContainer.zeroVectors();
+ }
+ }
+
+ if (batchNum < targetBatchNum) {
+ System.out.println(String.format("Wrong input of batch # ! Total # of batch in the file is %d. Please input a number 0..%d as batch #", batchNum+1, batchNum));
+ input.close();
+ System.exit(-1);
+ }
+
+ if (vcSerializable != null) {
+ showSingleBatch(vcSerializable, showHeader);
+ VectorContainer vectorContainer = (VectorContainer) vcSerializable.get();
+ vectorContainer.zeroVectors();
+ }
+ }
+
+
+ private void showSingleBatch (VectorAccessibleSerializable vcSerializable, boolean showHeader) {
+ VectorContainer vectorContainer = (VectorContainer)vcSerializable.get();
+
+ /* show the header of the batch */
+ if (showHeader) {
+ System.out.println(getBatchMetaInfo(vcSerializable).toString());
+
+ System.out.println("Schema Information");
+ for (VectorWrapper w : vectorContainer) {
+ MaterializedField field = w.getValueVector().getField();
+ System.out.println (String.format("name : %s, minor_type : %s, data_mode : %s",
+ field.getName(),
+ field.getType().getMinorType().toString(),
+ field.isNullable() ? "nullable":"non-nullable"
+ ));
+ }
+ }
+
+ /* show the contents in the batch */
+ VectorUtil.showVectorAccessibleContent(vectorContainer);
+ }
+
+
+ /* Get batch meta info : rows, selectedRows, dataSize */
+ private BatchMetaInfo getBatchMetaInfo(VectorAccessibleSerializable vcSerializable) {
+ VectorAccessible vectorContainer = vcSerializable.get();
+
+ int rows =0;
+ int selectedRows = 0;
+ int totalDataSize = 0;
+
+ rows = vectorContainer.getRecordCount();
+ selectedRows = rows;
+
+ if (vectorContainer.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
+ selectedRows = vcSerializable.getSv2().getCount();
+ }
+
+ for (VectorWrapper w : vectorContainer) {
+ totalDataSize += w.getValueVector().getBufferSize();
+ }
+
+ return new BatchMetaInfo(rows, selectedRows, totalDataSize);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index c34ab1f..554fad0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -25,6 +25,7 @@ import com.beust.jcommander.internal.Lists;
import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.io.Resources;
+
import org.apache.commons.lang.StringUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.ValueVector;
import java.io.IOException;
@@ -156,35 +158,12 @@ public class QuerySubmitter {
} catch (SchemaChangeException e) {
submissionFailed(new RpcException(e));
}
- List<String> columns = Lists.newArrayList();
- for (VectorWrapper vw : loader) {
- columns.add(vw.getValueVector().getField().getName());
- }
- width = columns.size();
- for (int row = 0; row < rows; row++) {
- if (row%50 == 0) {
- System.out.println(StringUtils.repeat("-", width*17 + 1));
- for (String column : columns) {
- System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14));
- }
- System.out.printf("|\n");
- System.out.println(StringUtils.repeat("-", width*17 + 1));
- }
- for (VectorWrapper vw : loader) {
- Object o = vw.getValueVector().getAccessor().getObject(row);
- if (o instanceof byte[]) {
- String value = new String((byte[]) o);
- System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
- } else {
- String value = o.toString();
- System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
- }
- }
- System.out.printf("|\n");
- }
+
+ VectorUtil.showVectorAccessibleContent(loader);
}
+
if (result.getHeader().getIsLastChunk()) {
- System.out.println(StringUtils.repeat("-", width*17 + 1));
+ //System.out.println(StringUtils.repeat("-", width*17 + 1));
latch.countDown();
}
result.release();
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
new file mode 100644
index 0000000..8b23bcd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.beust.jcommander.internal.Lists;
+
+public class VectorUtil {
+
+ public static void showVectorAccessibleContent(VectorAccessible va) {
+
+ int rows = va.getRecordCount();
+ List<String> columns = Lists.newArrayList();
+ for (VectorWrapper vw : va) {
+ columns.add(vw.getValueVector().getField().getName());
+ }
+
+ int width = columns.size();
+ for (int row = 0; row < rows; row++) {
+ if (row%50 == 0) {
+ System.out.println(StringUtils.repeat("-", width*17 + 1));
+ for (String column : columns) {
+ System.out.printf("| %-15s", column.length() <= 15 ? column : column.substring(0, 14));
+ }
+ System.out.printf("|\n");
+ System.out.println(StringUtils.repeat("-", width*17 + 1));
+ }
+ for (VectorWrapper vw : va) {
+ Object o = vw.getValueVector().getAccessor().getObject(row);
+ if (o instanceof byte[]) {
+ String value = new String((byte[]) o);
+ System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
+ } else {
+ String value = o.toString();
+ System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
+ }
+ }
+ System.out.printf("|\n");
+ }
+
+ if (rows > 0 )
+ System.out.println(StringUtils.repeat("-", width*17 + 1));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
new file mode 100644
index 0000000..7a9784b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.client;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileInputStream;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+/**
+ * The unit test case will read a physical plan in json format. The physical plan contains a "trace" operator,
+ * which will produce a dump file. The dump file will be input into DumpCat to test query mode and batch mode.
+ */
+
+public class DumpCatTest {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DumpCatTest.class);
+ DrillConfig c = DrillConfig.create();
+
+ @Test
+ public void testDumpCat(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
+ {
+
+ new NonStrictExpectations(){{
+ bitContext.getMetrics(); result = new MetricRegistry("test");
+ bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+ bitContext.getConfig(); result = c;
+ }};
+
+ PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+ PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
+ FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+ FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+ SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+ while(exec.next()){
+ }
+
+ if(context.getFailureCause() != null){
+ throw context.getFailureCause();
+ }
+ assertTrue(!context.isFailed());
+
+ FragmentHandle handle = context.getHandle();
+
+ /* Form the file name to which the trace output will dump the record batches */
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String logLocation = c.getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+ System.out.println("Found log location: " + logLocation);
+
+ String filename = String.format("%s//%s_%d_%d_mock-scan", logLocation, qid, majorFragmentId, minorFragmentId);
+
+ System.out.println("File Name: " + filename);
+
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", c.getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
+
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path(filename);
+ assertTrue("Trace file does not exist", fs.exists(path));
+
+ DumpCat dumpCat = new DumpCat();
+
+ //Test Query mode
+ FileInputStream input = new FileInputStream(filename);
+
+ dumpCat.doQuery(input);
+ input.close();
+
+ //Test Batch mode
+ input = new FileInputStream(filename);
+ dumpCat.doBatch(input,0,true);
+
+ input.close();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception{
+ // pause to get logger to catch up.
+ Thread.sleep(1000);
+ }
+}
[06/10] git commit: DRILL-271 abstract out serialization in trace
record batch by using VectorAccessibleSerializable. Add ability to retain
vectors.
Posted by ja...@apache.org.
DRILL-271 abstract out serialization in trace record batch by using VectorAccessibleSerializable. Add ability to retain vectors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0ac0b19b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0ac0b19b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0ac0b19b
Branch: refs/heads/master
Commit: 0ac0b19b4bd986e6323799ce54453336821a17f7
Parents: 90c302d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Oct 30 18:22:13 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:46 2013 -0700
----------------------------------------------------------------------
.../org/apache/drill/exec/ExecConstants.java | 1 +
.../cache/VectorAccessibleSerializable.java | 105 +++++++++++-
.../physical/impl/trace/TraceRecordBatch.java | 168 +++----------------
.../exec/record/AbstractSingleRecordBatch.java | 1 +
.../src/main/resources/drill-module.conf | 3 +-
.../impl/trace/TestTraceOutputDump.java | 59 +++----
6 files changed, 147 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3aec702..36504f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,4 +44,5 @@ public interface ExecConstants {
public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
+ public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index f4a6998..62f8097 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -17,15 +17,18 @@
*/
package org.apache.drill.exec.cache;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.yammer.metrics.MetricRegistry;
import com.yammer.metrics.Timer;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.common.util.DataInputInputStream;
import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -46,6 +49,11 @@ public class VectorAccessibleSerializable implements DrillSerializable {
private int recordCount = -1;
private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
private SelectionVector2 sv2;
+ private int incomingRecordCount;
+ private VectorContainer retainedVectorContainer;
+ private SelectionVector2 retainedSelectionVector;
+
+ private boolean retain = false;
/**
*
@@ -54,6 +62,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
this.va = va;
this.allocator = allocator;
+ incomingRecordCount = va.getRecordCount();
}
public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
@@ -61,6 +70,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
this.allocator = allocator;
this.sv2 = sv2;
if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ incomingRecordCount = va.getRecordCount();
}
public VectorAccessibleSerializable(BufferAllocator allocator) {
@@ -108,7 +118,8 @@ public class VectorAccessibleSerializable implements DrillSerializable {
@Override
public void writeToStream(OutputStream output) throws IOException {
- final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+ Preconditions.checkNotNull(output);
+ final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
ByteBuf[] incomingBuffers = batch.getBuffers();
@@ -147,7 +158,9 @@ public class VectorAccessibleSerializable implements DrillSerializable {
// fc.write(svBuf.nioBuffers());
svBuf.getBytes(0, output, svBuf.readableBytes());
- svBuf.release();
+ if (!retain) {
+ svBuf.release();
+ }
}
/* Dump the array of ByteBuf's associated with the value vectors */
@@ -161,11 +174,17 @@ public class VectorAccessibleSerializable implements DrillSerializable {
* we create a compound buffer
*/
totalBufferLength += buf.readableBytes();
- buf.release();
+ if (!retain) {
+ buf.release();
+ }
}
output.flush();
- context.stop();
+ if (retain) {
+ reconstructRecordBatch(batchDef, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+ }
+
+ timerContext.stop();
} catch (IOException e)
{
throw new RuntimeException(e);
@@ -174,10 +193,88 @@ public class VectorAccessibleSerializable implements DrillSerializable {
}
}
+ private void reconstructRecordBatch(UserBitShared.RecordBatchDef batchDef,
+ ByteBuf[] vvBufs, int totalBufferLength,
+ ByteBuf svBuf, int svCount, BatchSchema.SelectionVectorMode svMode)
+ {
+ VectorContainer container = retainedVectorContainer;
+ if (vvBufs.length > 0) /* If we have ByteBuf's associated with value vectors */
+ {
+
+ CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
+
+ /* Copy data from each buffer into the compound buffer */
+ for (int i = 0; i < vvBufs.length; i++)
+ {
+ cbb.addComponent(vvBufs[i]);
+ }
+
+ List<FieldMetadata> fields = batchDef.getFieldList();
+
+ int bufferOffset = 0;
+
+ /* For each value vector slice up the appropriate size from
+ * the compound buffer and load it into the value vector
+ */
+ int vectorIndex = 0;
+
+ for(VectorWrapper<?> vv : container)
+ {
+ FieldMetadata fmd = fields.get(vectorIndex);
+ ValueVector v = vv.getValueVector();
+ v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+ vectorIndex++;
+ bufferOffset += fmd.getBufferLength();
+ }
+ }
+
+ /* Set the selection vector for the record batch if the
+ * incoming batch had a selection vector
+ */
+ if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+ {
+ if (sv2 == null)
+ sv2 = new SelectionVector2(allocator);
+
+ sv2.setRecordCount(svCount);
+
+ /* create our selection vector from the
+ * incoming selection vector's buffer
+ */
+ sv2.setBuffer(svBuf);
+
+ svBuf.release();
+ }
+
+ container.buildSchema(svMode);
+
+ /* Set the record count in the value vector */
+ for(VectorWrapper<?> v : container)
+ {
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(incomingRecordCount);
+ }
+ retainedVectorContainer = container;
+ }
+
private void clear() {
}
public VectorAccessible get() {
return va;
}
+
+ public void retain(VectorContainer container) {
+ this.retain = true;
+ this.retainedVectorContainer = container;
+ }
+
+ public VectorContainer getRetainedVectorContainer() {
+ return retainedVectorContainer;
+ }
+
+ public SelectionVector2 getSv2() {
+ return sv2;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index d2828cd..36c390c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -21,6 +21,7 @@ package org.apache.drill.exec.physical.impl.trace;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.util.List;
@@ -29,6 +30,7 @@ import java.util.Formatter;
import com.google.common.collect.Iterators;
import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -45,6 +47,10 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/* TraceRecordBatch contains value vectors which are exactly the same
* as the incoming record batch's value vectors. If the incoming
@@ -72,8 +78,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
private final String logLocation;
/* File descriptors needed to be able to dump to log file */
- private FileOutputStream fos;
- private FileChannel fc;
+ private OutputStream fos;
public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
{
@@ -86,14 +91,12 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
/* Create the log file we will dump to and initialize the file descriptors */
try
{
- File file = new File(fileName);
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
+ FileSystem fs = FileSystem.get(conf);
/* create the file */
- file.createNewFile();
-
- fos = new FileOutputStream(file, true);
- fc = fos.getChannel();
-
+ fos = fs.create(new Path(fileName));
} catch (IOException e)
{
logger.error("Unable to create file: " + fileName);
@@ -124,35 +127,17 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
@Override
protected void doWork()
{
- /* get the selection vector mode */
- SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
-
- /* Get the array of buffers from the incoming record batch */
- WritableBatch batch = incoming.getWritableBatch();
-
- ByteBuf[] incomingBuffers = batch.getBuffers();
- RecordBatchDef batchDef = batch.getDef();
-
- /* ByteBuf associated with the selection vector */
- ByteBuf svBuf = null;
-
- /* Size of the selection vector */
- int svCount = 0;
-
- if (svMode == SelectionVectorMode.TWO_BYTE)
- {
- SelectionVector2 sv2 = incoming.getSelectionVector2();
- svCount = sv2.getCount();
- svBuf = sv2.getBuffer();
- }
-
- /* Write the ByteBuf for the value vectors and selection vectors to disk
- * totalBufferLength is the sum of size of all the ByteBuf across all value vectors
- */
- int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);
-
- /* Reconstruct the record batch from the ByteBuf's */
- reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(incoming,
+ incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? incoming.getSelectionVector2() : null,
+ context.getAllocator());
+ wrap.retain(container);
+
+ try {
+ wrap.writeToStream(fos);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ sv = wrap.getSv2();
}
@Override
@@ -198,114 +183,6 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
return fileName;
}
- private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
- {
- String fileName = getFileName();
- int totalBufferLength = 0;
-
- try
- {
- /* Write the metadata to the file */
- batchDef.writeDelimitedTo(fos);
-
- /* If we have a selection vector, dump it to file first */
- if (svBuf != null)
- {
-
- /* For writing to the selection vectors we use
- * setChar() method which does not modify the
- * reader and writer index. To copy the entire buffer
- * without having to get each byte individually we need
- * to set the writer index
- */
- svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
- fc.write(svBuf.nioBuffers());
- }
-
- /* Dump the array of ByteBuf's associated with the value vectors */
- for (ByteBuf buf : vvBufs)
- {
- /* dump the buffer into the file channel */
- fc.write(buf.nioBuffers());
-
- /* compute total length of buffer, will be used when
- * we create a compound buffer
- */
- totalBufferLength += buf.readableBytes();
- }
-
- fc.force(true);
- fos.flush();
-
- } catch (IOException e)
- {
- logger.error("Unable to write buffer to file: " + fileName);
- }
-
- return totalBufferLength;
- }
-
- private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
- ByteBuf[] vvBufs, int totalBufferLength,
- ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
- {
- if (vvBufs.length > 0) /* If we have ByteBuf's associated with value vectors */
- {
- CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
-
- /* Copy data from each buffer into the compound buffer */
- for (int i = 0; i < vvBufs.length; i++)
- {
- cbb.addComponent(vvBufs[i]);
- }
-
- List<FieldMetadata> fields = batchDef.getFieldList();
-
- int bufferOffset = 0;
-
- /* For each value vector slice up the appropriate size from
- * the compound buffer and load it into the value vector
- */
- int vectorIndex = 0;
-
- for(VectorWrapper<?> vv : container)
- {
- FieldMetadata fmd = fields.get(vectorIndex);
- ValueVector v = vv.getValueVector();
- v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
- vectorIndex++;
- bufferOffset += fmd.getBufferLength();
- }
- }
-
- /* Set the selection vector for the record batch if the
- * incoming batch had a selection vector
- */
- if (svMode == SelectionVectorMode.TWO_BYTE)
- {
- if (sv == null)
- sv = new SelectionVector2(context.getAllocator());
-
- sv.setRecordCount(svCount);
-
- /* create our selection vector from the
- * incoming selection vector's buffer
- */
- sv.setBuffer(svBuf);
-
- svBuf.release();
- }
-
- container.buildSchema(svMode);
-
- /* Set the record count in the value vector */
- for(VectorWrapper<?> v : container)
- {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(incoming.getRecordCount());
- }
- }
@Override
protected void cleanup()
@@ -318,7 +195,6 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
try
{
fos.close();
- fc.close();
} catch (IOException e)
{
logger.error("Unable to close file descriptors for file: " + getFileName());
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index acc6c9d..1639940 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -47,6 +47,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
case NOT_YET:
case STOP:
container.zeroVectors();
+ cleanup();
return upstream;
case OK_NEW_SCHEMA:
try{
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index ca78eb5..725c6b4 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -68,6 +68,7 @@ drill.exec: {
executor.threads: 4
}
trace: {
- directory: "/var/log/drill"
+ directory: "/var/log/drill",
+ filesystem: "file:///"
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index 92faf8d..4c04abc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -19,13 +19,13 @@ package org.apache.drill.exec.physical.impl.trace;
import static org.junit.Assert.*;
-import io.netty.buffer.ByteBufInputStream;
import mockit.Injectable;
import mockit.NonStrictExpectations;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -37,10 +37,14 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Test;
@@ -48,15 +52,6 @@ import com.google.common.base.Charsets;
import com.google.common.io.Files;
import com.yammer.metrics.MetricRegistry;
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.FileInputStream;
-
-import io.netty.buffer.ByteBuf;
-
/*
* This test uses a simple physical plan with a mock-scan that
* generates one row. The physical plan also consists of the
@@ -112,44 +107,30 @@ public class TestTraceOutputDump {
System.out.println("Found log location: " + logLocation);
- String filename = new String(logLocation + "/" + "mock-scan" + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
-
- System.out.println("File Name: " + filename);
+ String filename = String.format("%s//%s_%d_%d_mock-scan", logLocation, qid, majorFragmentId, minorFragmentId);
- File file = new File(filename);
+ System.out.println("File Name: " + filename);
- if (!file.exists())
- throw new IOException("Trace file not created");
-
- FileInputStream input = new FileInputStream(file.getAbsoluteFile());
- FileChannel fc = input.getChannel();
- int size = (int) fc.size();
- BufferAllocator allocator = context.getAllocator();
- ByteBuffer buffer = ByteBuffer.allocate((int) fc.size());
- ByteBuf buf = allocator.buffer(size);
-
- int readSize;
-
- /* Read the file into a ByteBuffer and transfer it into our ByteBuf */
- while ((readSize = (fc.read(buffer))) > 0)
- {
- buffer.position(0).limit(readSize);
- buf.writeBytes(buffer);
- buffer.clear();
- }
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", c.getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
- final ByteBufInputStream is = new ByteBufInputStream(buf, buf.readableBytes());
+ FileSystem fs = FileSystem.get(conf);
+ Path path = new Path(filename);
+ assertTrue("Trace file does not exist", fs.exists(path));
+ FSDataInputStream in = fs.open(path);
- RecordBatchDef batchDef = RecordBatchDef.parseDelimitedFrom(is);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(context.getAllocator());
+ wrap.readFromStream(in);
+ VectorAccessible container = wrap.get();
/* Assert there are no selection vectors */
- assertTrue(!batchDef.getIsSelectionVector2());
+ assertTrue(wrap.getSv2() == null);
/* Assert there is only one record */
- assertTrue(batchDef.getRecordCount() == 1);
+ assertTrue(container.getRecordCount() == 1);
/* Read the Integer value and ASSERT its Integer.MIN_VALUE */
- int value = buf.getInt(buf.readerIndex());
+ int value = (int) container.iterator().next().getValueVector().getAccessor().getObject(0);
assertTrue(value == Integer.MIN_VALUE);
}
[03/10] git commit: DRILL-271 retool VectorContainerSerializable to
work with containers and batches. also modify DrillSerializable to work with
InputStream, OutputStream
Posted by ja...@apache.org.
DRILL-271 retool VectorContainerSerializable to work with containers and batches. also modify DrillSerializable to work with InputStream, OutputStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/266d2483
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/266d2483
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/266d2483
Branch: refs/heads/master
Commit: 266d24836d079b5d4ddba013f04a79716da24d86
Parents: 30ada5d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Oct 28 17:34:05 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:31 2013 -0700
----------------------------------------------------------------------
.../drill/exec/cache/DrillSerializable.java | 10 +-
.../exec/cache/VectorContainerSerializable.java | 156 ++++++++++++++-----
.../drill/exec/client/QuerySubmitter.java | 2 +-
.../OrderedPartitionRecordBatch.java | 2 +
.../drill/exec/cache/TestVectorCache.java | 4 +-
5 files changed, 128 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
index 534d781..875e8b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -17,15 +17,15 @@
*/
package org.apache.drill.exec.cache;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.*;
/**
* Classes that can be put in the Distributed Cache must implement this interface.
*/
public interface DrillSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
- public void read(DataInput arg0) throws IOException;
- public void write (DataOutput arg0) throws IOException;
+ public void read(DataInput input) throws IOException;
+ public void readFromStream(InputStream input) throws IOException;
+ public void write(DataOutput output) throws IOException;
+ public void writeToStream(OutputStream output) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
index 1e6eeac..5813dd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
@@ -18,15 +18,18 @@
package org.apache.drill.exec.cache;
import com.google.common.collect.Lists;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
import io.netty.buffer.ByteBuf;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.DataInputInputStream;
import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -37,72 +40,147 @@ import java.util.List;
public class VectorContainerSerializable implements DrillSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
+ static final MetricRegistry metrics = DrillMetrics.getInstance();
+ static final String WRITER_TIMER = MetricRegistry.name(VectorContainerSerializable.class, "writerTime");
-// List<ValueVector> vectorList;
- private VectorContainer container;
+ private VectorAccessible va;
private BootStrapContext context;
- private int listSize = 0;
private int recordCount = -1;
+ private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
+ private SelectionVector2 sv2;
/**
*
- * @param container
+ * @param va
*/
- public VectorContainerSerializable(VectorContainer container){
- this.container = container;
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- this.context = new BootStrapContext(DrillConfig.create());
- this.listSize = container.getNumberOfColumns();
+ public VectorContainerSerializable(VectorAccessible va){
+ this.va = va;
+ this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
+ }
+
+ public VectorContainerSerializable(VectorAccessible va, SelectionVector2 sv2, FragmentContext context) {
+ this.va = va;
+ this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
+ this.sv2 = sv2;
+ if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
}
public VectorContainerSerializable() {
- this.container = new VectorContainer();
- this.context = new BootStrapContext(DrillConfig.create());
+ this.va = new VectorContainer();
+ this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
}
-
+
@Override
public void read(DataInput input) throws IOException {
+ readFromStream(DataInputInputStream.constructInputStream(input));
+ }
+
+ @Override
+ public void readFromStream(InputStream input) throws IOException {
+ VectorContainer container = new VectorContainer();
+ UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+ recordCount = batchDef.getRecordCount();
+ if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+ sv2.allocateNew(recordCount * 2);
+ sv2.getBuffer().setBytes(0, input, recordCount * 2);
+ svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
List<ValueVector> vectorList = Lists.newArrayList();
- int size = input.readInt();
- InputStream stream = DataInputInputStream.constructInputStream(input);
- for (int i = 0; i < size; i++) {
- FieldMetadata metaData = FieldMetadata.parseDelimitedFrom(stream);
- if (recordCount == -1) recordCount = metaData.getValueCount();
+ List<FieldMetadata> fieldList = batchDef.getFieldList();
+ for (FieldMetadata metaData : fieldList) {
int dataLength = metaData.getBufferLength();
byte[] bytes = new byte[dataLength];
- input.readFully(bytes);
+ input.read(bytes);
MaterializedField field = MaterializedField.create(metaData.getDef());
ByteBuf buf = context.getAllocator().buffer(dataLength);
buf.setBytes(0, bytes);
ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
vector.load(metaData, buf);
- vectorList.add((BaseDataValueVector) vector);
+ vectorList.add(vector);
}
container.addCollection(vectorList);
- container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ container.buildSchema(svMode);
container.setRecordCount(recordCount);
- this.listSize = vectorList.size();
+ va = container;
}
@Override
public void write(DataOutput output) throws IOException {
-// int size = vectorList.size();
- output.writeInt(listSize);
- for (VectorWrapper w : container) {
- OutputStream stream = DataOutputOutputStream.constructOutputStream(output);
- ValueVector vector = w.getValueVector();
- if (recordCount == -1) container.setRecordCount(vector.getMetadata().getValueCount());
- vector.getMetadata().writeDelimitedTo(stream);
- ByteBuf[] data = vector.getBuffers();
- for (ByteBuf bb : data) {
- byte[] bytes = new byte[bb.readableBytes()];
- bb.getBytes(0, bytes);
- stream.write(bytes);
+ writeToStream(DataOutputOutputStream.constructOutputStream(output));
+ }
+
+ @Override
+ public void writeToStream(OutputStream output) throws IOException {
+ final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
+
+ ByteBuf[] incomingBuffers = batch.getBuffers();
+ UserBitShared.RecordBatchDef batchDef = batch.getDef();
+
+ /* ByteBuf associated with the selection vector */
+ ByteBuf svBuf = null;
+
+ /* Size of the selection vector */
+ int svCount = 0;
+
+ if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+ {
+ svCount = sv2.getCount();
+ svBuf = sv2.getBuffer();
+ }
+
+ int totalBufferLength = 0;
+
+ try
+ {
+ /* Write the metadata to the file */
+ batchDef.writeDelimitedTo(output);
+
+ /* If we have a selection vector, dump it to file first */
+ if (svBuf != null)
+ {
+
+ /* For writing to the selection vectors we use
+ * setChar() method which does not modify the
+ * reader and writer index. To copy the entire buffer
+ * without having to get each byte individually we need
+ * to set the writer index
+ */
+ svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
+
+// fc.write(svBuf.nioBuffers());
+ svBuf.getBytes(0, output, svBuf.readableBytes());
+ svBuf.release();
}
+
+ /* Dump the array of ByteBuf's associated with the value vectors */
+ for (ByteBuf buf : incomingBuffers)
+ {
+ /* dump the buffer into the file channel */
+ int bufLength = buf.readableBytes();
+ buf.getBytes(0, output, bufLength);
+
+ /* compute total length of buffer, will be used when
+ * we create a compound buffer
+ */
+ totalBufferLength += buf.readableBytes();
+ buf.release();
+ }
+
+ output.flush();
+ context.stop();
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ } finally {
+ clear();
}
}
- public VectorContainer get() {
- return container;
+ private void clear() {
+ }
+
+ public VectorAccessible get() {
+ return va;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 160ef7f..c34ab1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -149,8 +149,8 @@ public class QuerySubmitter {
@Override
public void resultArrived(QueryResultBatch result) {
int rows = result.getHeader().getRowCount();
- count.addAndGet(rows);
if (result.getData() != null) {
+ count.addAndGet(rows);
try {
loader.load(result.getHeader().getDef(), result.getData());
} catch (SchemaChangeException e) {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 7dc7d55..ef3886c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -147,6 +147,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
for (VectorWrapper vw : containerToCache) {
vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
}
+ containerToCache.setRecordCount(copier.getOutputRecords());
// Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
// into a serializable wrapper object, and then add to distributed map
@@ -236,6 +237,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
for (VectorWrapper vw : candidatePartitionTable) {
vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
}
+ candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index 39ec720..ffc0274 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.server.Drillbit;
@@ -66,13 +67,14 @@ public class TestVectorCache {
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
+ container.setRecordCount(4);
VectorContainerSerializable wrap = new VectorContainerSerializable(container);
DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
mmap.put("vectors", wrap);
VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
- VectorContainer newContainer = newWrap.get();
+ VectorAccessible newContainer = newWrap.get();
for (VectorWrapper w : newContainer) {
ValueVector vv = w.getValueVector();
int values = vv.getAccessor().getValueCount();
[02/10] git commit: Addressed review comments from Jacques
Posted by ja...@apache.org.
Addressed review comments from Jacques
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/30ada5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/30ada5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/30ada5de
Branch: refs/heads/master
Commit: 30ada5decc9ab92fb95d075f2d01ab1387ce8c22
Parents: 6c78890
Author: Mehant Baid <me...@github.com>
Authored: Wed Oct 23 23:19:36 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Wed Oct 30 19:02:19 2013 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/TraceInjector.java | 2 +-
.../physical/impl/trace/TraceRecordBatch.java | 281 ++++++++++++-------
.../exec/record/AbstractSingleRecordBatch.java | 2 +-
.../apache/drill/exec/proto/UserBitShared.java | 42 +--
protocol/src/main/protobuf/UserBitShared.proto | 2 +-
5 files changed, 199 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 3e82a73..9c859a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -74,7 +74,7 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
/* For every child operator create a trace operator as its parent */
for (int i = 0; i < newChildren.size(); i++)
{
- String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
+ String traceTag = newChildren.get(i).toString() + Integer.toString(traceTagCount++);
/* Trace operator */
Trace traceOp = new Trace(newChildren.get(i), traceTag);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 97131cd..d2828cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -24,7 +24,10 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Formatter;
+import com.google.common.collect.Iterators;
+import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
@@ -43,6 +46,19 @@ import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import io.netty.buffer.ByteBuf;
+/* TraceRecordBatch contains value vectors which are exactly the same
+ * as the incoming record batch's value vectors. If the incoming
+ * record batch has a selection vector (type 2) then TraceRecordBatch
+ * will also contain a selection vector.
+ *
+ * Purpose of this record batch is to dump the data associated with all
+ * the value vectors and selection vector to disk.
+ *
+ * This record batch does not modify any data or schema, it simply
+ * consumes the incoming record batch's data, dump to disk and pass the
+ * same set of value vectors (and selection vectors) to its parent record
+ * batch
+ */
public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
@@ -55,11 +71,33 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
/* Location where the log should be dumped */
private final String logLocation;
+ /* File descriptors needed to be able to dump to log file */
+ private FileOutputStream fos;
+ private FileChannel fc;
+
public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
{
super(pop, context, incoming);
this.traceTag = pop.traceTag;
logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+ String fileName = getFileName();
+
+ /* Create the log file we will dump to and initialize the file descriptors */
+ try
+ {
+ File file = new File(fileName);
+
+ /* create the file */
+ file.createNewFile();
+
+ fos = new FileOutputStream(file, true);
+ fc = fos.getChannel();
+
+ } catch (IOException e)
+ {
+ logger.error("Unable to create file: " + fileName);
+ }
}
@Override
@@ -75,6 +113,13 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
* Function is invoked for every record batch and it simply
* dumps the buffers associated with all the value vectors in
* this record batch to a log file.
+ *
+ * Function is divided into three main parts
+ * 1. Get all the buffers(ByteBuf's) associated with incoming
+ * record batch's value vectors and selection vector
+ * 2. Dump these buffers to the log file (performed by writeToFile())
+ * 3. Construct the record batch with these buffers to look exactly like
+ * the incoming record batch (performed by reconstructRecordBatch())
*/
@Override
protected void doWork()
@@ -85,37 +130,87 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
/* Get the array of buffers from the incoming record batch */
WritableBatch batch = incoming.getWritableBatch();
- BufferAllocator allocator = context.getAllocator();
ByteBuf[] incomingBuffers = batch.getBuffers();
RecordBatchDef batchDef = batch.getDef();
- /* Total length of buffers across all value vectors */
- int totalBufferLength = 0;
+ /* ByteBuf associated with the selection vector */
+ ByteBuf svBuf = null;
- String fileName = getFileName();
+ /* Size of the selection vector */
+ int svCount = 0;
- try
+ if (svMode == SelectionVectorMode.TWO_BYTE)
{
- File file = new File(fileName);
+ SelectionVector2 sv2 = incoming.getSelectionVector2();
+ svCount = sv2.getCount();
+ svBuf = sv2.getBuffer();
+ }
- if (!file.exists())
- file.createNewFile();
+ /* Write the ByteBuf for the value vectors and selection vectors to disk
+ * totalBufferLength is the sum of size of all the ByteBuf across all value vectors
+ */
+ int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);
- FileOutputStream fos = new FileOutputStream(file, true);
+ /* Reconstruct the record batch from the ByteBuf's */
+ reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+ }
+
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException
+ {
+ /* Trace operator does not deal with hyper vectors yet */
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+ /* we have a new schema, clear our existing container to
+ * load the new value vectors
+ */
+ container.clear();
+ /* Add all the value vectors in the container */
+ for(VectorWrapper<?> vv : incoming)
+ {
+ TransferPair tp = vv.getValueVector().getTransferPair();
+ container.add(tp.getTo());
+ }
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv;
+ }
+
+ private String getFileName()
+ {
+ /* From the context, get the query id, major fragment id,
+ * minor fragment id. This will be used as the file name
+ * to which we will dump the incoming buffer data
+ */
+ FragmentHandle handle = incoming.getContext().getHandle();
+
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+
+ return fileName;
+ }
+
+ private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
+ {
+ String fileName = getFileName();
+ int totalBufferLength = 0;
+
+ try
+ {
/* Write the metadata to the file */
batchDef.writeDelimitedTo(fos);
- FileChannel fc = fos.getChannel();
-
/* If we have a selection vector, dump it to file first */
- if (svMode == SelectionVectorMode.TWO_BYTE)
+ if (svBuf != null)
{
- SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
- int recordCount = incomingSV2.getCount();
- int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;
-
- ByteBuf buf = incomingSV2.getBuffer();
/* For writing to the selection vectors we use
* setChar() method which does not modify the
@@ -123,32 +218,16 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
* without having to get each byte individually we need
* to set the writer index
*/
- buf.writerIndex(sv2Size);
-
- /* dump the selection vector to log */
- dumpByteBuf(fc, buf);
-
- if (sv == null)
- sv = new SelectionVector2(allocator);
+ svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
- sv.setRecordCount(recordCount);
-
- /* create our selection vector from the
- * incoming selection vector's buffer
- */
- sv.setBuffer(buf);
-
- buf.release();
+ fc.write(svBuf.nioBuffers());
}
- /* For each buffer dump it to log and compute total length */
- for (ByteBuf buf : incomingBuffers)
+ /* Dump the array of ByteBuf's associated with the value vectors */
+ for (ByteBuf buf : vvBufs)
{
/* dump the buffer into the file channel */
- dumpByteBuf(fc, buf);
-
- /* Reset reader index on the ByteBuf so we can read it again */
- buf.resetReaderIndex();
+ fc.write(buf.nioBuffers());
/* compute total length of buffer, will be used when
* we create a compound buffer
@@ -156,41 +235,66 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
totalBufferLength += buf.readableBytes();
}
- fc.close();
- fos.close();
+ fc.force(true);
+ fos.flush();
} catch (IOException e)
{
logger.error("Unable to write buffer to file: " + fileName);
}
- /* allocate memory for the compound buffer, compound buffer
- * will contain the data from all the buffers across all the
- * value vectors
- */
- ByteBuf byteBuf = allocator.buffer(totalBufferLength);
+ return totalBufferLength;
+ }
- /* Copy data from each buffer into the compound buffer */
- for (int i = 0; i < incomingBuffers.length; i++)
+ private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
+ ByteBuf[] vvBufs, int totalBufferLength,
+ ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
+ {
+ if (vvBufs.length > 0) /* If we have ByteBuf's associated with value vectors */
{
- byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
- }
+ CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
+
+ /* Copy data from each buffer into the compound buffer */
+ for (int i = 0; i < vvBufs.length; i++)
+ {
+ cbb.addComponent(vvBufs[i]);
+ }
- List<FieldMetadata> fields = batchDef.getFieldList();
+ List<FieldMetadata> fields = batchDef.getFieldList();
- int bufferOffset = 0;
+ int bufferOffset = 0;
- /* For each value vector slice up the appropriate size from
- * the compound buffer and load it into the value vector
+ /* For each value vector slice up the appropriate size from
+ * the compound buffer and load it into the value vector
+ */
+ int vectorIndex = 0;
+
+ for(VectorWrapper<?> vv : container)
+ {
+ FieldMetadata fmd = fields.get(vectorIndex);
+ ValueVector v = vv.getValueVector();
+ v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+ vectorIndex++;
+ bufferOffset += fmd.getBufferLength();
+ }
+ }
+
+ /* Set the selection vector for the record batch if the
+ * incoming batch had a selection vector
*/
- int vectorIndex = 0;
- for(VectorWrapper<?> vv : container)
+ if (svMode == SelectionVectorMode.TWO_BYTE)
{
- FieldMetadata fmd = fields.get(vectorIndex);
- ValueVector v = vv.getValueVector();
- v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
- vectorIndex++;
- bufferOffset += fmd.getBufferLength();
+ if (sv == null)
+ sv = new SelectionVector2(context.getAllocator());
+
+ sv.setRecordCount(svCount);
+
+ /* create our selection vector from the
+ * incoming selection vector's buffer
+ */
+ sv.setBuffer(svBuf);
+
+ svBuf.release();
}
container.buildSchema(svMode);
@@ -204,56 +308,21 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
}
@Override
- protected void setupNewSchema() throws SchemaChangeException
+ protected void cleanup()
{
- /* Trace operator does not deal with hyper vectors yet */
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
- throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
- /* we have a new schema, clear our existing container to
- * load the new value vectors
- */
- container.clear();
+ /* Release the selection vector */
+ if (sv != null)
+ sv.clear();
- /* Add all the value vectors in the container */
- for(VectorWrapper<?> vv : incoming)
+ /* Close the file descriptors */
+ try
{
- TransferPair tp = vv.getValueVector().getTransferPair();
- container.add(tp.getTo());
+ fos.close();
+ fc.close();
+ } catch (IOException e)
+ {
+ logger.error("Unable to close file descriptors for file: " + getFileName());
}
}
- @Override
- public SelectionVector2 getSelectionVector2() {
- return sv;
- }
-
- private String getFileName()
- {
- /* From the context, get the query id, major fragment id,
- * minor fragment id. This will be used as the file name
- * to which we will dump the incoming buffer data
- */
- FragmentHandle handle = incoming.getContext().getHandle();
-
- String qid = QueryIdHelper.getQueryId(handle.getQueryId());
-
- int majorFragmentId = handle.getMajorFragmentId();
- int minorFragmentId = handle.getMinorFragmentId();
-
- return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
- }
-
- private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
- {
- int bufferLength = buf.readableBytes();
-
- byte[] byteArray = new byte[bufferLength];
-
- /* Transfer bytes to a byte array */
- buf.readBytes(byteArray);
-
- /* Drop the byte array into the file channel */
- fc.write(ByteBuffer.wrap(byteArray));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 25284b6..acc6c9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
throw new UnsupportedOperationException();
}
}
-
+
protected abstract void setupNewSchema() throws SchemaChangeException;
protected abstract void doWork();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index f305c00..60dd6fd 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2910,13 +2910,13 @@ public final class UserBitShared {
*/
int getRecordCount();
- // optional bool isSelectionVector2 = 3;
+ // optional bool is_selection_vector_2 = 3;
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
boolean hasIsSelectionVector2();
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
boolean getIsSelectionVector2();
}
@@ -3084,17 +3084,17 @@ public final class UserBitShared {
return recordCount_;
}
- // optional bool isSelectionVector2 = 3;
- public static final int ISSELECTIONVECTOR2_FIELD_NUMBER = 3;
+ // optional bool is_selection_vector_2 = 3;
+ public static final int IS_SELECTION_VECTOR_2_FIELD_NUMBER = 3;
private boolean isSelectionVector2_;
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean hasIsSelectionVector2() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean getIsSelectionVector2() {
return isSelectionVector2_;
@@ -3667,22 +3667,22 @@ public final class UserBitShared {
return this;
}
- // optional bool isSelectionVector2 = 3;
+ // optional bool is_selection_vector_2 = 3;
private boolean isSelectionVector2_ ;
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean hasIsSelectionVector2() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean getIsSelectionVector2() {
return isSelectionVector2_;
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public Builder setIsSelectionVector2(boolean value) {
bitField0_ |= 0x00000004;
@@ -3691,7 +3691,7 @@ public final class UserBitShared {
return this;
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public Builder clearIsSelectionVector2() {
bitField0_ = (bitField0_ & ~0x00000004);
@@ -4984,16 +4984,16 @@ public final class UserBitShared {
"ror\030\005 \003(\0132\031.exec.shared.ParsingError\"\\\n\014" +
"ParsingError\022\024\n\014start_column\030\002 \001(\005\022\021\n\tst" +
"art_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(\005\022\017\n\007end" +
- "_row\030\005 \001(\005\"\r\n\013RecordBatch\"m\n\016RecordBatch",
+ "_row\030\005 \001(\005\"\r\n\013RecordBatch\"p\n\016RecordBatch",
"Def\022)\n\005field\030\001 \003(\0132\032.exec.shared.FieldMe" +
- "tadata\022\024\n\014record_count\030\002 \001(\005\022\032\n\022isSelect" +
- "ionVector2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n\003de" +
- "f\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_count\030\002" +
- " \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013group_c" +
- "ount\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005chi" +
- "ld\030\006 \003(\0132\032.exec.shared.FieldMetadataB.\n\033" +
- "org.apache.drill.exec.protoB\rUserBitShar" +
- "edH\001"
+ "tadata\022\024\n\014record_count\030\002 \001(\005\022\035\n\025is_selec" +
+ "tion_vector_2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n" +
+ "\003def\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_coun" +
+ "t\030\002 \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013grou" +
+ "p_count\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005" +
+ "child\030\006 \003(\0132\032.exec.shared.FieldMetadataB" +
+ ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
+ "haredH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 5bea284..0d98797 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -34,7 +34,7 @@ message RecordBatch{
message RecordBatchDef {
repeated FieldMetadata field = 1;
optional int32 record_count = 2;
- optional bool isSelectionVector2 = 3;
+ optional bool is_selection_vector_2 = 3;
}
[05/10] git commit: DRILL-271 release buffer in
VectorAccessibleSerializable read. Also read directly from stream.
Posted by ja...@apache.org.
DRILL-271 release buffer in VectorAccessibleSerializable read. Also read directly from stream.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/90c302de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/90c302de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/90c302de
Branch: refs/heads/master
Commit: 90c302def50ca74e7c4c2b2c40a75a8250f4df00
Parents: d529352
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Oct 30 11:25:10 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:42 2013 -0700
----------------------------------------------------------------------
.../apache/drill/exec/cache/VectorAccessibleSerializable.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c302de/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 24387d8..f4a6998 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -87,13 +87,12 @@ public class VectorAccessibleSerializable implements DrillSerializable {
List<FieldMetadata> fieldList = batchDef.getFieldList();
for (FieldMetadata metaData : fieldList) {
int dataLength = metaData.getBufferLength();
- byte[] bytes = new byte[dataLength];
- input.read(bytes);
MaterializedField field = MaterializedField.create(metaData.getDef());
ByteBuf buf = allocator.buffer(dataLength);
- buf.setBytes(0, bytes);
+ buf.writeBytes(input, dataLength);
ValueVector vector = TypeHelper.getNewVector(field, allocator);
vector.load(metaData, buf);
+ buf.release();
vectorList.add(vector);
}
container.addCollection(vectorList);
[07/10] git commit: DRILL-271 Address code review comments.
VectorAccessibleSerializable now take WritableBatch. Release and reconstruct
code now part of Writablebatch class.
Posted by ja...@apache.org.
DRILL-271 Address code review comments. VectorAccessibleSerializable now take WritableBatch. Release and reconstruct code now part of Writablebatch class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b44b6c71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b44b6c71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b44b6c71
Branch: refs/heads/master
Commit: b44b6c719bb4cf7159ef0cb0c6ec172b3273f681
Parents: 0ac0b19
Author: Steven Phillips <sp...@maprtech.com>
Authored: Thu Oct 31 17:29:26 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:56 2013 -0700
----------------------------------------------------------------------
.../cache/VectorAccessibleSerializable.java | 181 ++++++-------------
.../OrderedPartitionRecordBatch.java | 6 +-
.../physical/impl/trace/TraceRecordBatch.java | 25 ++-
.../apache/drill/exec/record/WritableBatch.java | 58 ++++++
.../drill/exec/cache/TestVectorCache.java | 8 +-
.../drill/exec/cache/TestWriteToDisk.java | 8 +-
6 files changed, 133 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 62f8097..e5bb94b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -22,75 +22,87 @@ import com.google.common.collect.Lists;
import com.yammer.metrics.MetricRegistry;
import com.yammer.metrics.Timer;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.common.util.DataInputInputStream;
import org.apache.drill.common.util.DataOutputOutputStream;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import java.io.*;
import java.util.List;
+/**
+ * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
+ * from an InputStream and construct a new VectorContainer.
+ */
public class VectorAccessibleSerializable implements DrillSerializable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
private VectorAccessible va;
+ private WritableBatch batch;
private BufferAllocator allocator;
private int recordCount = -1;
private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
private SelectionVector2 sv2;
- private int incomingRecordCount;
- private VectorContainer retainedVectorContainer;
- private SelectionVector2 retainedSelectionVector;
private boolean retain = false;
- /**
- *
- * @param va
- */
- public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
- this.va = va;
+ public VectorAccessibleSerializable(BufferAllocator allocator) {
this.allocator = allocator;
- incomingRecordCount = va.getRecordCount();
+ this.va = new VectorContainer();
}
- public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
- this.va = va;
- this.allocator = allocator;
- this.sv2 = sv2;
- if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
- incomingRecordCount = va.getRecordCount();
+ public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocator){
+ this(batch, null, allocator);
}
- public VectorAccessibleSerializable(BufferAllocator allocator) {
- this.va = new VectorContainer();
+ /**
+ * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will never be released by this class, and ownership
+ * is maintained by caller.
+ * @param batch
+ * @param sv2
+ * @param allocator
+ */
+ public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, BufferAllocator allocator) {
this.allocator = allocator;
+ if (batch != null) {
+ this.batch = batch;
+ }
+ if (sv2 != null) {
+ this.sv2 = sv2;
+ this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
}
@Override
public void read(DataInput input) throws IOException {
readFromStream(DataInputInputStream.constructInputStream(input));
}
-
+
+ /**
+ * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
+ * and construct the vectors and add them to a vector container
+ * @param input the InputStream to read from
+ * @throws IOException
+ */
@Override
public void readFromStream(InputStream input) throws IOException {
VectorContainer container = new VectorContainer();
UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
- sv2.allocateNew(recordCount * 2);
- sv2.getBuffer().setBytes(0, input, recordCount * 2);
+ if (sv2 == null) {
+ sv2 = new SelectionVector2(allocator);
+ }
+ sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+ sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
}
List<ValueVector> vectorList = Lists.newArrayList();
@@ -116,20 +128,27 @@ public class VectorAccessibleSerializable implements DrillSerializable {
writeToStream(DataOutputOutputStream.constructOutputStream(output));
}
+ public void writeToStreamAndRetain(OutputStream output) throws IOException {
+ retain = true;
+ writeToStream(output);
+ }
+
+ /**
+ * Serializes the VectorAccessible va and writes it to an output stream
+ * @param output the OutputStream to write to
+ * @throws IOException
+ */
@Override
public void writeToStream(OutputStream output) throws IOException {
Preconditions.checkNotNull(output);
final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
ByteBuf[] incomingBuffers = batch.getBuffers();
UserBitShared.RecordBatchDef batchDef = batch.getDef();
/* ByteBuf associated with the selection vector */
ByteBuf svBuf = null;
-
- /* Size of the selection vector */
- int svCount = 0;
+ Integer svCount = null;
if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
{
@@ -137,8 +156,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
svBuf = sv2.getBuffer();
}
- int totalBufferLength = 0;
-
try
{
/* Write the metadata to the file */
@@ -147,42 +164,20 @@ public class VectorAccessibleSerializable implements DrillSerializable {
/* If we have a selection vector, dump it to file first */
if (svBuf != null)
{
-
- /* For writing to the selection vectors we use
- * setChar() method which does not modify the
- * reader and writer index. To copy the entire buffer
- * without having to get each byte individually we need
- * to set the writer index
- */
- svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-// fc.write(svBuf.nioBuffers());
svBuf.getBytes(0, output, svBuf.readableBytes());
- if (!retain) {
- svBuf.release();
- }
+ sv2.setBuffer(svBuf);
+ sv2.setRecordCount(svCount);
}
/* Dump the array of ByteBuf's associated with the value vectors */
for (ByteBuf buf : incomingBuffers)
{
- /* dump the buffer into the file channel */
+ /* dump the buffer into the OutputStream */
int bufLength = buf.readableBytes();
buf.getBytes(0, output, bufLength);
-
- /* compute total length of buffer, will be used when
- * we create a compound buffer
- */
- totalBufferLength += buf.readableBytes();
- if (!retain) {
- buf.release();
- }
}
output.flush();
- if (retain) {
- reconstructRecordBatch(batchDef, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
- }
timerContext.stop();
} catch (IOException e)
@@ -193,86 +188,16 @@ public class VectorAccessibleSerializable implements DrillSerializable {
}
}
- private void reconstructRecordBatch(UserBitShared.RecordBatchDef batchDef,
- ByteBuf[] vvBufs, int totalBufferLength,
- ByteBuf svBuf, int svCount, BatchSchema.SelectionVectorMode svMode)
- {
- VectorContainer container = retainedVectorContainer;
- if (vvBufs.length > 0) /* If we have ByteBuf's associated with value vectors */
- {
-
- CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
-
- /* Copy data from each buffer into the compound buffer */
- for (int i = 0; i < vvBufs.length; i++)
- {
- cbb.addComponent(vvBufs[i]);
- }
-
- List<FieldMetadata> fields = batchDef.getFieldList();
-
- int bufferOffset = 0;
-
- /* For each value vector slice up the appropriate size from
- * the compound buffer and load it into the value vector
- */
- int vectorIndex = 0;
-
- for(VectorWrapper<?> vv : container)
- {
- FieldMetadata fmd = fields.get(vectorIndex);
- ValueVector v = vv.getValueVector();
- v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
- vectorIndex++;
- bufferOffset += fmd.getBufferLength();
- }
- }
-
- /* Set the selection vector for the record batch if the
- * incoming batch had a selection vector
- */
- if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
- {
- if (sv2 == null)
- sv2 = new SelectionVector2(allocator);
-
- sv2.setRecordCount(svCount);
-
- /* create our selection vector from the
- * incoming selection vector's buffer
- */
- sv2.setBuffer(svBuf);
-
- svBuf.release();
- }
-
- container.buildSchema(svMode);
-
- /* Set the record count in the value vector */
- for(VectorWrapper<?> v : container)
- {
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(incomingRecordCount);
- }
- retainedVectorContainer = container;
- }
-
private void clear() {
+ if (!retain) {
+ batch.clear();
+ }
}
public VectorAccessible get() {
return va;
}
- public void retain(VectorContainer container) {
- this.retain = true;
- this.retainedVectorContainer = container;
- }
-
- public VectorContainer getRetainedVectorContainer() {
- return retainedVectorContainer;
- }
-
public SelectionVector2 getSv2() {
return sv2;
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index e39b82e..317e705 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -160,7 +160,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
vectorList.add(vw.getValueVector());
}
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
mmap.put(mapKey, wrap);
wrap = null;
@@ -238,8 +239,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
}
candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 36c390c..b73ddc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -116,28 +116,27 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
* Function is invoked for every record batch and it simply
* dumps the buffers associated with all the value vectors in
* this record batch to a log file.
- *
- * Function is divided into three main parts
- * 1. Get all the buffers(ByteBuf's) associated with incoming
- * record batch's value vectors and selection vector
- * 2. Dump these buffers to the log file (performed by writeToFile())
- * 3. Construct the record batch with these buffers to look exactly like
- * the incoming record batch (performed by reconstructRecordBatch())
*/
@Override
protected void doWork()
{
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(incoming,
- incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? incoming.getSelectionVector2() : null,
- context.getAllocator());
- wrap.retain(container);
+
+ boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+ if (incomingHasSv2) {
+ sv = incoming.getSelectionVector2();
+ } else {
+ sv = null;
+ }
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
+ incoming, incomingHasSv2 ? true : false);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
try {
- wrap.writeToStream(fos);
+ wrap.writeToStreamAndRetain(fos);
} catch (IOException e) {
throw new RuntimeException(e);
}
- sv = wrap.getSv2();
+ batch.reconstructContainer(container);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 76b79db..a33ca37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
+import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -37,6 +38,7 @@ public class WritableBatch {
private final RecordBatchDef def;
private final ByteBuf[] buffers;
+ private boolean cleared = false;
private WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) {
logger.debug("Created new writable batch with def {} and buffers {}", def, buffers);
@@ -58,6 +60,62 @@ public class WritableBatch {
return buffers;
}
+ public void reconstructContainer(VectorContainer container)
+ {
+ Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+ if (buffers.length > 0) /* If we have ByteBuf's associated with value vectors */
+ {
+
+ CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
+
+ /* Copy data from each buffer into the compound buffer */
+ for (ByteBuf buf : buffers)
+ {
+ cbb.addComponent(buf);
+ }
+
+ List<FieldMetadata> fields = def.getFieldList();
+
+ int bufferOffset = 0;
+
+ /* For each value vector slice up the appropriate size from
+ * the compound buffer and load it into the value vector
+ */
+ int vectorIndex = 0;
+
+ for(VectorWrapper<?> vv : container)
+ {
+ FieldMetadata fmd = fields.get(vectorIndex);
+ ValueVector v = vv.getValueVector();
+ v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+ vectorIndex++;
+ bufferOffset += fmd.getBufferLength();
+ }
+ }
+
+ SelectionVectorMode svMode;
+ if (def.hasIsSelectionVector2() && def.getIsSelectionVector2()) {
+ svMode = SelectionVectorMode.TWO_BYTE;
+ } else {
+ svMode = SelectionVectorMode.NONE;
+ }
+ container.buildSchema(svMode);
+
+ /* Set the record count in the value vector */
+ for(VectorWrapper<?> v : container)
+ {
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(def.getRecordCount());
+ }
+ }
+
+ public void clear() {
+ for (ByteBuf buf : buffers) {
+ buf.release();
+ }
+ cleared = true;
+ }
+
public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
List<ValueVector> vectors = Lists.newArrayList();
for(VectorWrapper<?> vw : vws){
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index 94aa3dd..cf274c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -24,10 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
@@ -68,7 +65,8 @@ public class TestVectorCache {
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
mmap.put("vectors", wrap);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index 11d15d8..fb3e821 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -24,10 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
@@ -74,7 +71,8 @@ public class TestWriteToDisk {
VectorContainer container = new VectorContainer();
container.addCollection(vectorList);
container.setRecordCount(4);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
Configuration conf = new Configuration();
conf.set("fs.name.default", "file:///");