You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:18 UTC

[01/10] git commit: DRILL-256 revised patch

Updated Branches:
  refs/heads/master 964f01413 -> c287fa604


DRILL-256 revised patch


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/6c78890a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/6c78890a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/6c78890a

Branch: refs/heads/master
Commit: 6c78890a81e4d487926d9bfc3c0eb68edfa07524
Parents: 964f014
Author: Mehant Baid <me...@github.com>
Authored: Wed Oct 23 02:04:01 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Wed Oct 30 19:02:04 2013 -0700

----------------------------------------------------------------------
 distribution/src/resources/drill-override.conf  |   3 +
 .../org/apache/drill/exec/ExecConstants.java    |   1 +
 .../apache/drill/exec/ops/FragmentContext.java  |   5 +
 .../physical/base/AbstractPhysicalVisitor.java  |   5 +
 .../exec/physical/base/PhysicalVisitor.java     |   1 +
 .../drill/exec/physical/config/Trace.java       |  70 +++++
 .../drill/exec/physical/impl/ImplCreator.java   |  12 +-
 .../drill/exec/physical/impl/TraceInjector.java |  90 +++++++
 .../physical/impl/trace/TraceBatchCreator.java  |  41 +++
 .../physical/impl/trace/TraceRecordBatch.java   | 259 +++++++++++++++++++
 .../drill/exec/record/RecordBatchLoader.java    |   3 +-
 .../apache/drill/exec/record/WritableBatch.java |  15 +-
 .../exec/record/selection/SelectionVector2.java |  44 +++-
 .../src/main/resources/drill-module.conf        |   3 +
 .../impl/trace/TestTraceMultiRecordBatch.java   |  84 ++++++
 .../impl/trace/TestTraceOutputDump.java         | 161 ++++++++++++
 .../drill/exec/record/vector/TestLoad.java      |   2 +-
 .../trace/multi_record_batch_trace.json         |  49 ++++
 .../src/test/resources/trace/simple_trace.json  |  37 +++
 .../apache/drill/exec/proto/UserBitShared.java  | 100 ++++++-
 .../drill/exec/proto/helper/QueryIdHelper.java  |  32 +++
 protocol/src/main/protobuf/UserBitShared.proto  |   3 +-
 22 files changed, 993 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/distribution/src/resources/drill-override.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf
index c2ed9df..7694ced 100644
--- a/distribution/src/resources/drill-override.conf
+++ b/distribution/src/resources/drill-override.conf
@@ -71,4 +71,7 @@ drill.exec: {
     global.max.width: 100,
     executor.threads: 4
   }
+  trace: {
+    directory: "/var/log/drill"
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 72776d1..3aec702 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -43,4 +43,5 @@ public interface ExecConstants {
   public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads";
   public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
   public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
+  public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 352c467..a7f6d2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.ops;
 
 import java.io.IOException;
 
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.compile.ClassTransformer;
 import org.apache.drill.exec.compile.QueryClassLoader;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -141,4 +142,8 @@ public class FragmentContext {
   public QueryClassLoader getClassLoader(){
     return loader;
   }
+
+  public DrillConfig getConfig() {
+      return context.getConfig();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index bf6c68c..9e2ef0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -43,6 +43,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
   }
 
   @Override
+  public T visitTrace(Trace trace, X value) throws E{
+      return visitOp(trace, value);
+  }
+
+  @Override
   public T visitSort(Sort sort, X value) throws E{
     return visitOp(sort, value);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 5692b9f..2474c15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -37,6 +37,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
   public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP;
   public RETURN visitUnion(Union union, EXTRA value) throws EXCEP;
   public RETURN visitProject(Project project, EXTRA value) throws EXCEP;
+  public RETURN visitTrace(Trace trace, EXTRA value) throws EXCEP;
   public RETURN visitSort(Sort sort, EXTRA value) throws EXCEP;
   public RETURN visitLimit(Limit limit, EXTRA value) throws EXCEP;
   public RETURN visitMergeJoin(MergeJoinPOP join, EXTRA value) throws EXCEP;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
new file mode 100644
index 0000000..a81d3e9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/Trace.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.Size;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName("trace")
+public class Trace extends AbstractSingle {
+
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Trace.class);
+
+    /* Tag associated with each trace operator
+     * Printed along with trace output to distinguish
+     * between multiple trace operators within same plan
+     */
+    public final String traceTag;
+
+    public Trace(@JsonProperty("child") PhysicalOperator child, @JsonProperty("tag") String traceTag) {
+        super(child);
+        this.traceTag = traceTag;
+    }
+
+    @Override
+    public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+        return physicalVisitor.visitTrace(this, value);
+    }
+
+    @Override
+    public OperatorCost getCost() {
+
+        /* Compute the total size (row count * row size) */
+        Size size     = child.getSize();
+        long diskSize = size.getRecordCount() * size.getRecordSize();
+
+        return new OperatorCost(0, diskSize, 0, child.getSize().getRecordCount());
+    }
+
+    @Override
+    public Size getSize() {
+        return child.getSize();
+    }
+
+    @Override
+    protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+        return new Trace(child, traceTag);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 844b3a7..67e9452 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderCreato
 import org.apache.drill.exec.physical.impl.project.ProjectBatchCreator;
 import org.apache.drill.exec.physical.impl.sort.SortBatchCreator;
 import org.apache.drill.exec.physical.impl.svremover.SVRemoverCreator;
+import org.apache.drill.exec.physical.impl.trace.TraceBatchCreator;
 import org.apache.drill.exec.physical.impl.union.UnionBatchCreator;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.store.json.JSONScanBatchCreator;
@@ -73,9 +74,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   private AggBatchCreator abc = new AggBatchCreator();
   private MergeJoinCreator mjc = new MergeJoinCreator();
   private RootExec root = null;
-  
+  private TraceBatchCreator tbc = new TraceBatchCreator();
+
   private ImplCreator(){}
-  
+
   public RootExec getRoot(){
     return root;
   }
@@ -86,6 +88,10 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
   }
 
   @Override
+  public RecordBatch visitTrace(Trace op, FragmentContext context) throws ExecutionSetupException {
+    return tbc.getBatch(context, op, getChildren(op, context));
+  }
+  @Override
   public RecordBatch visitSubScan(SubScan subScan, FragmentContext context) throws ExecutionSetupException {
     Preconditions.checkNotNull(subScan);
     Preconditions.checkNotNull(context);
@@ -153,7 +159,7 @@ public class ImplCreator extends AbstractPhysicalVisitor<RecordBatch, FragmentCo
     return fbc.getBatch(context, filter, getChildren(filter, context));
   }
 
-  
+
   @Override
   public RecordBatch visitStreamingAggregate(StreamingAggregate config, FragmentContext context)
       throws ExecutionSetupException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
new file mode 100644
index 0000000..3e82a73
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.Trace;
+
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+
+public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, FragmentContext, ExecutionSetupException> {
+
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceInjector.class);
+
+    static int traceTagCount = 0;
+
+
+    RootExec root = null;
+    private ScreenCreator sc = new ScreenCreator();
+
+    public static PhysicalOperator getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+        TraceInjector tI = new TraceInjector();
+        PhysicalOperator newOp = root.accept(tI, context);
+
+        return newOp;
+    }
+
+    /**
+     * Traverse the physical plan and inject the trace operator after
+     * every operator.
+     * @param op Physical operator under which the trace operator will be injected
+     * @param context Fragment context
+     * @return same physical operator as passed in, but its child will be a trace operator
+     *         whose child will be the original child of this operator
+     * @throws ExecutionSetupException
+     */
+    @Override
+    public PhysicalOperator visitOp(PhysicalOperator op, FragmentContext context) throws ExecutionSetupException{
+
+        List<PhysicalOperator> newChildren = Lists.newArrayList();
+        List<PhysicalOperator> list = null;
+        PhysicalOperator newOp = op;
+
+        /* Get the list of child operators */
+        for (PhysicalOperator child : op)
+        {
+            newChildren.add(child.accept(this, context));
+        }
+
+        list = Lists.newArrayList();
+
+        /* For every child operator create a trace operator as its parent */
+        for (int i = 0; i < newChildren.size(); i++)
+        {
+            String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
+
+            /* Trace operator */
+            Trace traceOp = new Trace(newChildren.get(i), traceTag);
+            list.add(traceOp);
+        }
+
+        /* Inject trace operator */
+        if (list.size() > 0)
+            newOp = op.getNewWithChildren(list);
+
+        return newOp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
new file mode 100644
index 0000000..e857c25
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.trace;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Trace;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import com.google.common.base.Preconditions;
+
+import java.util.List;
+
+public class TraceBatchCreator implements BatchCreator<Trace> {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
+
+    @Override
+    public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
+        //Preconditions.checkArgument(children.size() == 1);
+        return new TraceRecordBatch(config, children.iterator().next(), context);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
new file mode 100644
index 0000000..97131cd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.trace;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.Trace;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
+
+import io.netty.buffer.ByteBuf;
+
+public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
+{
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+
+    private SelectionVector2 sv = null;
+
+    /* Tag associated with each trace operator */
+    final String traceTag;
+
+    /* Location where the log should be dumped */
+    private final String logLocation;
+
+    public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
+    {
+        super(pop, context, incoming);
+        this.traceTag = pop.traceTag;
+        logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+    }
+
+    @Override
+    public int getRecordCount()
+    {
+        if (sv == null)
+            return incoming.getRecordCount();
+        else
+           return sv.getCount();
+    }
+
+    /**
+     * Function is invoked for every record batch and it simply
+     * dumps the buffers associated with all the value vectors in
+     * this record batch to a log file.
+     */
+    @Override
+    protected void doWork()
+    {
+        /* get the selection vector mode */
+        SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
+
+        /* Get the array of buffers from the incoming record batch */
+        WritableBatch batch = incoming.getWritableBatch();
+
+        BufferAllocator allocator = context.getAllocator();
+        ByteBuf[] incomingBuffers = batch.getBuffers();
+        RecordBatchDef batchDef = batch.getDef();
+
+        /* Total length of buffers across all value vectors */
+        int totalBufferLength = 0;
+
+        String fileName = getFileName();
+
+        try
+        {
+            File file = new File(fileName);
+
+            if (!file.exists())
+                file.createNewFile();
+
+            FileOutputStream fos = new FileOutputStream(file, true);
+
+            /* Write the metadata to the file */
+            batchDef.writeDelimitedTo(fos);
+
+            FileChannel fc = fos.getChannel();
+
+            /* If we have a selection vector, dump it to file first */
+            if (svMode == SelectionVectorMode.TWO_BYTE)
+            {
+                SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
+                int recordCount = incomingSV2.getCount();
+                int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;
+
+                ByteBuf buf = incomingSV2.getBuffer();
+
+                /* For writing to the selection vectors we use
+                 * setChar() method which does not modify the
+                 * reader and writer index. To copy the entire buffer
+                 * without having to get each byte individually we need
+                 * to set the writer index
+                 */
+                buf.writerIndex(sv2Size);
+
+                /* dump the selection vector to log */
+                dumpByteBuf(fc, buf);
+
+                if (sv == null)
+                    sv = new SelectionVector2(allocator);
+
+                sv.setRecordCount(recordCount);
+
+                /* create our selection vector from the
+                 * incoming selection vector's buffer
+                 */
+                sv.setBuffer(buf);
+
+                buf.release();
+            }
+
+            /* For each buffer dump it to log and compute total length */
+            for (ByteBuf buf : incomingBuffers)
+            {
+                /* dump the buffer into the file channel */
+                dumpByteBuf(fc, buf);
+
+                /* Reset reader index on the ByteBuf so we can read it again */
+                buf.resetReaderIndex();
+
+                /* compute total length of buffer, will be used when
+                 * we create a compound buffer
+                 */
+                totalBufferLength += buf.readableBytes();
+            }
+
+            fc.close();
+            fos.close();
+
+        } catch (IOException e)
+        {
+            logger.error("Unable to write buffer to file: " + fileName);
+        }
+
+        /* allocate memory for the compound buffer, compound buffer
+         * will contain the data from all the buffers across all the
+         * value vectors
+         */
+        ByteBuf byteBuf = allocator.buffer(totalBufferLength);
+
+        /* Copy data from each buffer into the compound buffer */
+        for (int i = 0; i < incomingBuffers.length; i++)
+        {
+            byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
+        }
+
+        List<FieldMetadata> fields = batchDef.getFieldList();
+
+        int bufferOffset = 0;
+
+        /* For each value vector slice up the appropriate size from
+         * the compound buffer and load it into the value vector
+         */
+        int vectorIndex = 0;
+        for(VectorWrapper<?> vv : container)
+        {
+            FieldMetadata fmd = fields.get(vectorIndex);
+            ValueVector v = vv.getValueVector();
+            v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
+            vectorIndex++;
+            bufferOffset += fmd.getBufferLength();
+        }
+
+        container.buildSchema(svMode);
+
+        /* Set the record count in the value vector */
+        for(VectorWrapper<?> v : container)
+        {
+            ValueVector.Mutator m = v.getValueVector().getMutator();
+            m.setValueCount(incoming.getRecordCount());
+        }
+    }
+
+    @Override
+    protected void setupNewSchema() throws SchemaChangeException
+    {
+        /* Trace operator does not deal with hyper vectors yet */
+        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+        /* we have a new schema, clear our existing container to
+         * load the new value vectors
+         */
+        container.clear();
+
+        /* Add all the value vectors in the container */
+        for(VectorWrapper<?> vv : incoming)
+        {
+            TransferPair tp = vv.getValueVector().getTransferPair();
+            container.add(tp.getTo());
+        }
+    }
+
+    @Override
+    public SelectionVector2 getSelectionVector2() {
+        return sv;
+    }
+
+    private String getFileName()
+    {
+        /* From the context, get the query id, major fragment id,
+         * minor fragment id. This will be used as the file name
+         * to which we will dump the incoming buffer data
+         */
+        FragmentHandle handle = incoming.getContext().getHandle();
+
+        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+        int majorFragmentId = handle.getMajorFragmentId();
+        int minorFragmentId = handle.getMinorFragmentId();
+
+        return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
+    }
+
+    private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
+    {
+        int bufferLength = buf.readableBytes();
+
+        byte[] byteArray = new byte[bufferLength];
+
+        /* Transfer bytes to a byte array */
+        buf.readBytes(byteArray);
+
+        /* Drop the byte array into the file channel */
+        fc.write(ByteBuffer.wrap(byteArray));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 4057c58..016f340 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -143,7 +143,8 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
   }
   
   public WritableBatch getWritableBatch(){
-    return WritableBatch.getBatchNoSVWrap(valueCount, container);
+    boolean isSV2 = (schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE);
+    return WritableBatch.getBatchNoHVWrap(valueCount, container, isSV2);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 5fb9b0a..76b79db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -58,16 +58,16 @@ public class WritableBatch {
     return buffers;
   }
 
-  public static WritableBatch getBatchNoSVWrap(int recordCount, Iterable<VectorWrapper<?>> vws) {
+  public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
     List<ValueVector> vectors = Lists.newArrayList();
     for(VectorWrapper<?> vw : vws){
       Preconditions.checkArgument(!vw.isHyper());
       vectors.add(vw.getValueVector());
     }
-    return getBatchNoSV(recordCount, vectors);
+    return getBatchNoHV(recordCount, vectors, isSV2);
   }
   
-  public static WritableBatch getBatchNoSV(int recordCount, Iterable<ValueVector> vectors) {
+  public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
 
@@ -84,14 +84,17 @@ public class WritableBatch {
       vv.clear();
     }
 
-    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).build();
+    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).setIsSelectionVector2(isSV2).build();
     WritableBatch b = new WritableBatch(batchDef, buffers);
     return b;
   }
   
   public static WritableBatch get(RecordBatch batch) {
-    if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE) throw new UnsupportedOperationException("Only batches without selections vectors are writable.");
-    return getBatchNoSVWrap(batch.getRecordCount(), batch);
+    if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+        throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
+
+    boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
+    return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 6105ead..6ff6cf2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -35,6 +35,8 @@ public class SelectionVector2 implements Closeable{
   private int recordCount;
   private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
 
+  public static final int RECORD_SIZE = 2;
+
   public SelectionVector2(BufferAllocator allocator) {
     this.allocator = allocator;
   }
@@ -43,24 +45,54 @@ public class SelectionVector2 implements Closeable{
     return recordCount;
   }
 
+  public ByteBuf getBuffer()
+  {
+      ByteBuf bufferHandle = this.buffer;
+
+      /* Increment the ref count for this buffer */
+      bufferHandle.retain();
+
+      /* We are passing ownership of the buffer to the
+       * caller. clear the buffer from within our selection vector
+       */
+      clear();
+
+      return bufferHandle;
+  }
+
+  public void setBuffer(ByteBuf bufferHandle)
+  {
+      /* clear the existing buffer */
+      clear();
+
+      this.buffer = bufferHandle;
+      buffer.retain();
+  }
+
   public char getIndex(int index){
     
-    return buffer.getChar(index*2);
+    return buffer.getChar(index * RECORD_SIZE);
   }
 
   public void setIndex(int index, char value){
-    buffer.setChar(index*2, value);
+    buffer.setChar(index * RECORD_SIZE, value);
   }
   
   public void allocateNew(int size){
     clear();
-    buffer = allocator.buffer(size * 2);
+    buffer = allocator.buffer(size * RECORD_SIZE);
   }
-  
+
   public SelectionVector2 clone(){
     SelectionVector2 newSV = new SelectionVector2(allocator);
     newSV.recordCount = recordCount;
     newSV.buffer = buffer;
+
+    /* Since buffer and newSV.buffer essentially point to the
+     * same buffer, if we don't do a retain() on the newSV's
+     * buffer, it might get freed.
+     */
+    newSV.buffer.retain();
     clear();
     return newSV;
   }
@@ -82,6 +114,6 @@ public class SelectionVector2 implements Closeable{
   public void close() throws IOException {
     clear();
   }
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7e40a05..ca78eb5 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -67,4 +67,7 @@ drill.exec: {
     global.max.width: 100,
     executor.threads: 4
   }
+  trace: {
+    directory: "/var/log/drill"
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
new file mode 100644
index 0000000..ea14b08
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.trace;
+
+import static org.junit.Assert.*;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+/*
+ * This test uses a physical plan with the mock scan that generates 100k records.
+ * Here we inject the "trace" operator in two locations in the plan.
+ *
+ * The goal of this test is to make sure the trace operator works fine across
+ * multiple record batches and when there is a selection vector present in the
+ * incoming container of the trace operator.
+ */
+public class TestTraceMultiRecordBatch {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTraceOutputDump.class);
+    DrillConfig c = DrillConfig.create();
+
+
+    @Test
+    public void testFilter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
+    {
+
+        new NonStrictExpectations(){{
+            bitContext.getMetrics(); result = new MetricRegistry("test");
+            bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+            bitContext.getConfig(); result = c;
+        }};
+
+        PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+        PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/multi_record_batch_trace.json"), Charsets.UTF_8));
+        FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+        FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+        SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+        while(exec.next()) {
+        }
+
+        if(context.getFailureCause() != null){
+            throw context.getFailureCause();
+        }
+        assertTrue(!context.isFailed());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
new file mode 100644
index 0000000..92faf8d
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.trace;
+
+import static org.junit.Assert.*;
+
+import io.netty.buffer.ByteBufInputStream;
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+import java.nio.channels.FileChannel;
+import java.nio.ByteBuffer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.FileInputStream;
+
+import io.netty.buffer.ByteBuf;
+
+/*
+ * This test uses a simple physical plan with a mock-scan that
+ * generates one row. The physical plan also consists of the
+ * trace operator which will dump the records as bytes to the
+ * log file.
+ *
+ * Objective of this test is not only to verify if the injected
+ * trace operator dumps the output to the log file, but also
+ * to read the dumped output and verify if it matches what we expect
+ * it to be. Since our scan produces only one record we expect record count to
+ * be one, expect there are no selection vectors and we know the value of
+ * the record that is dumped (Integer.MIN_VALUE) so we compare it with this
+ * known value.
+ */
+public class TestTraceOutputDump {
+    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTraceOutputDump.class);
+    DrillConfig c = DrillConfig.create();
+
+
+    @Test
+    public void testFilter(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
+    {
+
+        new NonStrictExpectations(){{
+            bitContext.getMetrics(); result = new MetricRegistry("test");
+            bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+            bitContext.getConfig(); result = c;
+        }};
+
+        PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+        PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
+        FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+        FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+        SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+        while(exec.next()){
+        }
+
+        if(context.getFailureCause() != null){
+            throw context.getFailureCause();
+        }
+        assertTrue(!context.isFailed());
+
+        FragmentHandle handle = context.getHandle();
+
+        /* Form the file name to which the trace output will dump the record batches */
+        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+        int majorFragmentId = handle.getMajorFragmentId();
+        int minorFragmentId = handle.getMinorFragmentId();
+
+        String logLocation = c.getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+        System.out.println("Found log location: " + logLocation);
+
+        String filename = new String(logLocation + "/" + "mock-scan" + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
+
+        System.out.println("File Name: " + filename);
+
+        File file = new File(filename);
+
+        if (!file.exists())
+            throw new IOException("Trace file not created");
+
+        FileInputStream input = new FileInputStream(file.getAbsoluteFile());
+        FileChannel fc = input.getChannel();
+        int size = (int) fc.size();
+        BufferAllocator allocator = context.getAllocator();
+        ByteBuffer buffer = ByteBuffer.allocate((int) fc.size());
+        ByteBuf buf = allocator.buffer(size);
+
+        int readSize;
+
+        /* Read the file into a ByteBuffer and transfer it into our ByteBuf */
+        while ((readSize = (fc.read(buffer))) > 0)
+        {
+            buffer.position(0).limit(readSize);
+            buf.writeBytes(buffer);
+            buffer.clear();
+        }
+
+        final ByteBufInputStream is = new ByteBufInputStream(buf, buf.readableBytes());
+
+        RecordBatchDef batchDef = RecordBatchDef.parseDelimitedFrom(is);
+
+        /* Assert there are no selection vectors */
+        assertTrue(!batchDef.getIsSelectionVector2());
+
+        /* Assert there is only one record */
+        assertTrue(batchDef.getRecordCount() == 1);
+
+        /* Read the Integer value and ASSERT its Integer.MIN_VALUE */
+        int value = buf.getInt(buf.readerIndex());
+        assertTrue(value == Integer.MIN_VALUE);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception{
+        // pause to get logger to catch up.
+        Thread.sleep(1000);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
index 94d66d4..4c45960 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/vector/TestLoad.java
@@ -66,7 +66,7 @@ public class TestLoad {
       v.getMutator().setValueCount(100);
     }
 
-    WritableBatch writableBatch = WritableBatch.getBatchNoSV(100, vectors);
+    WritableBatch writableBatch = WritableBatch.getBatchNoHV(100, vectors, false);
     RecordBatchLoader batchLoader = new RecordBatchLoader(allocator);
     ByteBuf[] byteBufs = writableBatch.getBuffers();
     int bytes = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json b/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json
new file mode 100644
index 0000000..26ddb3d
--- /dev/null
+++ b/exec/java-exec/src/test/resources/trace/multi_record_batch_trace.json
@@ -0,0 +1,49 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+        graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org",
+            entries:[
+                {records: 100000, types: [
+                  {name: "blue", type: "BIGINT", mode: "REQUIRED"}
+                ]}
+            ]
+        },
+{
+    pop : "trace",
+    @id : 2,
+    tag : "mock-sub-scan",
+    child : 1
+  },
+{
+    pop : "filter",
+    @id : 3,
+    child : 2,
+    expr : " (blue) < (1) "
+  },
+  {
+      pop : "trace",
+      @id : 4,
+      tag : "trace-with-sv",
+      child : 3
+    },
+{
+    pop : "selection-vector-remover",
+    @id : 5,
+    child : 4
+},
+  {
+            @id: 6,
+            child: 5,
+            pop: "screen"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/exec/java-exec/src/test/resources/trace/simple_trace.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/trace/simple_trace.json b/exec/java-exec/src/test/resources/trace/simple_trace.json
new file mode 100644
index 0000000..8dbea70
--- /dev/null
+++ b/exec/java-exec/src/test/resources/trace/simple_trace.json
@@ -0,0 +1,37 @@
+{
+    head:{
+        type:"APACHE_DRILL_PHYSICAL",
+        version:"1",
+        generator:{
+            type:"manual"
+        }
+    },
+        graph:[
+        {
+            @id:1,
+            pop:"mock-sub-scan",
+            url: "http://apache.org",
+            entries:[
+                {records: 1, types: [
+                  {name: "red", type: "INT", mode: "REQUIRED"}
+                ]}
+            ]
+        }, {
+           @id:2,
+           child: 1,
+           pop:"project",
+           exprs: [
+             { ref: "col1", expr:"red" }
+           ]
+           }, {
+          pop : "trace",
+          @id : 3,
+          child : 2,
+          tag : "mock-scan"
+          }, {
+            @id: 4,
+            child: 3,
+            pop: "screen"
+        }
+    ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index faf67cd..f305c00 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2909,6 +2909,16 @@ public final class UserBitShared {
      * <code>optional int32 record_count = 2;</code>
      */
     int getRecordCount();
+
+    // optional bool isSelectionVector2 = 3;
+    /**
+     * <code>optional bool isSelectionVector2 = 3;</code>
+     */
+    boolean hasIsSelectionVector2();
+    /**
+     * <code>optional bool isSelectionVector2 = 3;</code>
+     */
+    boolean getIsSelectionVector2();
   }
   /**
    * Protobuf type {@code exec.shared.RecordBatchDef}
@@ -2974,6 +2984,11 @@ public final class UserBitShared {
               recordCount_ = input.readInt32();
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000002;
+              isSelectionVector2_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -3069,9 +3084,26 @@ public final class UserBitShared {
       return recordCount_;
     }
 
+    // optional bool isSelectionVector2 = 3;
+    public static final int ISSELECTIONVECTOR2_FIELD_NUMBER = 3;
+    private boolean isSelectionVector2_;
+    /**
+     * <code>optional bool isSelectionVector2 = 3;</code>
+     */
+    public boolean hasIsSelectionVector2() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional bool isSelectionVector2 = 3;</code>
+     */
+    public boolean getIsSelectionVector2() {
+      return isSelectionVector2_;
+    }
+
     private void initFields() {
       field_ = java.util.Collections.emptyList();
       recordCount_ = 0;
+      isSelectionVector2_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3091,6 +3123,9 @@ public final class UserBitShared {
       if (((bitField0_ & 0x00000001) == 0x00000001)) {
         output.writeInt32(2, recordCount_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBool(3, isSelectionVector2_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -3108,6 +3143,10 @@ public final class UserBitShared {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(2, recordCount_);
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(3, isSelectionVector2_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3233,6 +3272,8 @@ public final class UserBitShared {
         }
         recordCount_ = 0;
         bitField0_ = (bitField0_ & ~0x00000002);
+        isSelectionVector2_ = false;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
 
@@ -3274,6 +3315,10 @@ public final class UserBitShared {
           to_bitField0_ |= 0x00000001;
         }
         result.recordCount_ = recordCount_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.isSelectionVector2_ = isSelectionVector2_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3319,6 +3364,9 @@ public final class UserBitShared {
         if (other.hasRecordCount()) {
           setRecordCount(other.getRecordCount());
         }
+        if (other.hasIsSelectionVector2()) {
+          setIsSelectionVector2(other.getIsSelectionVector2());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3619,6 +3667,39 @@ public final class UserBitShared {
         return this;
       }
 
+      // optional bool isSelectionVector2 = 3;
+      private boolean isSelectionVector2_ ;
+      /**
+       * <code>optional bool isSelectionVector2 = 3;</code>
+       */
+      public boolean hasIsSelectionVector2() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional bool isSelectionVector2 = 3;</code>
+       */
+      public boolean getIsSelectionVector2() {
+        return isSelectionVector2_;
+      }
+      /**
+       * <code>optional bool isSelectionVector2 = 3;</code>
+       */
+      public Builder setIsSelectionVector2(boolean value) {
+        bitField0_ |= 0x00000004;
+        isSelectionVector2_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool isSelectionVector2 = 3;</code>
+       */
+      public Builder clearIsSelectionVector2() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        isSelectionVector2_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:exec.shared.RecordBatchDef)
     }
 
@@ -4903,15 +4984,16 @@ public final class UserBitShared {
       "ror\030\005 \003(\0132\031.exec.shared.ParsingError\"\\\n\014" +
       "ParsingError\022\024\n\014start_column\030\002 \001(\005\022\021\n\tst" +
       "art_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(\005\022\017\n\007end" +
-      "_row\030\005 \001(\005\"\r\n\013RecordBatch\"Q\n\016RecordBatch",
+      "_row\030\005 \001(\005\"\r\n\013RecordBatch\"m\n\016RecordBatch",
       "Def\022)\n\005field\030\001 \003(\0132\032.exec.shared.FieldMe" +
-      "tadata\022\024\n\014record_count\030\002 \001(\005\"\261\001\n\rFieldMe" +
-      "tadata\022\033\n\003def\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013v" +
-      "alue_count\030\002 \001(\005\022\027\n\017var_byte_length\030\003 \001(" +
-      "\005\022\023\n\013group_count\030\004 \001(\005\022\025\n\rbuffer_length\030" +
-      "\005 \001(\005\022)\n\005child\030\006 \003(\0132\032.exec.shared.Field" +
-      "MetadataB.\n\033org.apache.drill.exec.protoB" +
-      "\rUserBitSharedH\001"
+      "tadata\022\024\n\014record_count\030\002 \001(\005\022\032\n\022isSelect" +
+      "ionVector2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n\003de" +
+      "f\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_count\030\002" +
+      " \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013group_c" +
+      "ount\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005chi" +
+      "ld\030\006 \003(\0132\032.exec.shared.FieldMetadataB.\n\033" +
+      "org.apache.drill.exec.protoB\rUserBitShar" +
+      "edH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4947,7 +5029,7 @@ public final class UserBitShared {
           internal_static_exec_shared_RecordBatchDef_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_shared_RecordBatchDef_descriptor,
-              new java.lang.String[] { "Field", "RecordCount", });
+              new java.lang.String[] { "Field", "RecordCount", "IsSelectionVector2", });
           internal_static_exec_shared_FieldMetadata_descriptor =
             getDescriptor().getMessageTypes().get(5);
           internal_static_exec_shared_FieldMetadata_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
new file mode 100644
index 0000000..749b3d5
--- /dev/null
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.proto.helper;
+
+import java.util.UUID;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+/* Helper class around the QueryId protobuf */
+public class QueryIdHelper {
+
+    /* Generate a UUID from the two parts of the queryid */
+    public static String getQueryId(QueryId queryId)
+    {
+        return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6c78890a/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 7e2506c..5bea284 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -34,7 +34,8 @@ message RecordBatch{
 message RecordBatchDef {
 	repeated FieldMetadata field = 1;
 	optional int32 record_count = 2;
-	
+    optional bool isSelectionVector2 = 3;
+
 }
 
 message FieldMetadata {


[04/10] git commit: DRILL-271 refactor DistributedCache code. Uses hazel cast 3.1 and custom serialization.

Posted by ja...@apache.org.
DRILL-271 refactor DistributedCache code. Uses hazel cast 3.1 and custom serialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d529352e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d529352e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d529352e

Branch: refs/heads/master
Commit: d529352e03410e1612db691677dc90c855342c01
Parents: 266d248
Author: Steven Phillips <sp...@maprtech.com>
Authored: Tue Oct 29 20:24:00 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:38 2013 -0700

----------------------------------------------------------------------
 exec/java-exec/pom.xml                          |   2 +-
 .../exec/cache/HCDrillSerializableWrapper.java  |  63 -------
 .../cache/HCSerializableWrapperClasses.java     |  31 ----
 .../cache/HCVectorAccessibleSerializer.java     |  56 ++++++
 .../org/apache/drill/exec/cache/HazelCache.java |  37 ++--
 .../org/apache/drill/exec/cache/LocalCache.java |  12 +-
 .../apache/drill/exec/cache/ProtoBufWrap.java   |   8 +-
 .../cache/VectorAccessibleSerializable.java     | 184 ++++++++++++++++++
 .../exec/cache/VectorContainerSerializable.java | 186 -------------------
 .../OrderedPartitionRecordBatch.java            |  20 +-
 .../org/apache/drill/exec/server/Drillbit.java  |   6 +-
 .../drill/exec/cache/TestVectorCache.java       |   8 +-
 .../drill/exec/cache/TestWriteToDisk.java       | 108 +++++++++++
 13 files changed, 403 insertions(+), 318 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 063e60e..c5b169d 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -185,7 +185,7 @@
     <dependency>
       <groupId>com.hazelcast</groupId>
       <artifactId>hazelcast</artifactId>
-      <version>2.5.1</version>
+      <version>3.1</version>
     </dependency>
     <dependency>
       <groupId>org.codehaus.janino</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
deleted file mode 100644
index 3f2c41c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCDrillSerializableWrapper.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.hazelcast.nio.DataSerializable;
-
-import java.io.*;
-
-/**
- * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
- */
-public abstract class HCDrillSerializableWrapper implements DataSerializable {
-
-  private DrillSerializable obj;
-
-  public HCDrillSerializableWrapper() {}
-
-  public HCDrillSerializableWrapper(DrillSerializable obj) {
-    this.obj = obj;
-  }
-
-  public void readData(DataInput in) throws IOException {
-    obj.read(in);
-  }
-
-  public void writeData(DataOutput out) throws IOException {
-    obj.write(out);
-  }
-
-  public DrillSerializable get() {
-    return obj;
-  }
-
-  /**
-   *  This is a method that will get a Class specific implementation of HCDrillSerializableWrapper. Class specific implentations
-   *  are necessary because Hazel Cast requires object that have constructors with no parameters.
-   * @param value
-   * @param clazz
-   * @return
-   */
-  public static HCDrillSerializableWrapper getWrapper(DrillSerializable value, Class clazz) {
-    if (clazz.equals(VectorContainerSerializable.class)) {
-      return new HCSerializableWrapperClasses.HCVectorListSerializable(value);
-    } else {
-      throw new UnsupportedOperationException("HCDrillSerializableWrapper not implemented for " + clazz);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
deleted file mode 100644
index d22723a..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCSerializableWrapperClasses.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-public class HCSerializableWrapperClasses {
-  public static class HCVectorListSerializable extends HCDrillSerializableWrapper {
-
-    public HCVectorListSerializable() {
-      super(new VectorContainerSerializable());
-    }
-
-    public HCVectorListSerializable(DrillSerializable obj) {
-      super(obj);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
new file mode 100644
index 0000000..0d5ba96
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HCVectorAccessibleSerializer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.StreamSerializer;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import java.io.*;
+
+/**
+ * Wraps a DrillSerializable object. Objects of this class can be put in the HazelCast implementation of Distributed Cache
+ */
+public class HCVectorAccessibleSerializer implements StreamSerializer<VectorAccessibleSerializable> {
+
+  private BufferAllocator allocator;
+
+  public HCVectorAccessibleSerializer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  public VectorAccessibleSerializable read(ObjectDataInput in) throws IOException {
+    VectorAccessibleSerializable va = new VectorAccessibleSerializable(allocator);
+    va.readFromStream(DataInputInputStream.constructInputStream(in));
+    return va;
+  }
+
+  public void write(ObjectDataOutput out, VectorAccessibleSerializable va) throws IOException {
+    va.writeToStream(DataOutputOutputStream.constructOutputStream(out));
+  }
+
+  public void destroy() {}
+
+  public int getTypeId() {
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
index 577dfeb..9dd4373 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/HazelCache.java
@@ -22,12 +22,15 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.hazelcast.config.SerializerConfig;
 import com.hazelcast.core.*;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.ProtoBufImpl.HWorkQueueStatus;
 import org.apache.drill.exec.cache.ProtoBufImpl.HandlePlan;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
@@ -35,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.hazelcast.config.Config;
+import org.apache.drill.exec.server.DrillbitContext;
 
 public class HazelCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HazelCache.class);
@@ -44,9 +48,11 @@ public class HazelCache implements DistributedCache {
   private ITopic<HWorkQueueStatus> workQueueLengths;
   private HandlePlan fragments;
   private Cache<WorkQueueStatus, Integer>  endpoints;
-  
-  public HazelCache(DrillConfig config) {
+  private BufferAllocator allocator;
+
+  public HazelCache(DrillConfig config, BufferAllocator allocator) {
     this.instanceName = config.getString(ExecConstants.SERVICE_NAME);
+    this.allocator = allocator;
   }
 
   private class Listener implements MessageListener<HWorkQueueStatus>{
@@ -61,7 +67,10 @@ public class HazelCache implements DistributedCache {
   
   public void run() {
     Config c = new Config();
+    SerializerConfig sc = new SerializerConfig().setImplementation(new HCVectorAccessibleSerializer(allocator))
+            .setTypeClass(VectorAccessibleSerializable.class);
     c.setInstanceName(instanceName);
+    c.getSerializationConfig().addSerializerConfig(sc);
     instance = getInstanceOrCreateNew(c);
     workQueueLengths = instance.getTopic("queue-length");
     fragments = new HandlePlan(instance);
@@ -120,11 +129,11 @@ public class HazelCache implements DistributedCache {
 
   @Override
   public Counter getCounter(String name) {
-    return new HCCounterImpl(this.instance.getAtomicNumber(name));
+    return new HCCounterImpl(this.instance.getAtomicLong(name));
   }
 
   public static class HCDistributedMapImpl<V> implements DistributedMap<V> {
-    private IMap<String, HCDrillSerializableWrapper> m;
+    private IMap<String, DrillSerializable> m;
     private Class<V> clazz;
 
     public HCDistributedMapImpl(IMap m, Class<V> clazz) {
@@ -133,24 +142,24 @@ public class HazelCache implements DistributedCache {
     }
 
     public DrillSerializable get(String key) {
-      return m.get(key).get();
+      return m.get(key);
     }
 
     public void put(String key, DrillSerializable value) {
-      m.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+      m.put(key, value);
     }
 
     public void putIfAbsent(String key, DrillSerializable value) {
-      m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+      m.putIfAbsent(key, value);
     }
 
     public void putIfAbsent(String key, DrillSerializable value, long ttl, TimeUnit timeunit) {
-      m.putIfAbsent(key, HCDrillSerializableWrapper.getWrapper(value, clazz), ttl, timeunit);
+      m.putIfAbsent(key, value, ttl, timeunit);
     }
   }
 
   public static class HCDistributedMultiMapImpl<V> implements DistributedMultiMap<V> {
-    private com.hazelcast.core.MultiMap<String, HCDrillSerializableWrapper> mmap;
+    private com.hazelcast.core.MultiMap<String, DrillSerializable> mmap;
     private Class<V> clazz;
 
     public HCDistributedMultiMapImpl(com.hazelcast.core.MultiMap mmap, Class<V> clazz) {
@@ -160,22 +169,22 @@ public class HazelCache implements DistributedCache {
 
     public Collection<DrillSerializable> get(String key) {
       List<DrillSerializable> list = Lists.newArrayList();
-      for (HCDrillSerializableWrapper v : mmap.get(key)) {
-        list.add(v.get());
+      for (DrillSerializable v : mmap.get(key)) {
+        list.add(v);
       }
       return list;
     }
 
     @Override
     public void put(String key, DrillSerializable value) {
-      mmap.put(key, HCDrillSerializableWrapper.getWrapper(value, clazz));
+      mmap.put(key, value);
     }
   }
 
   public static class HCCounterImpl implements Counter {
-    private AtomicNumber n;
+    private IAtomicLong n;
 
-    public HCCounterImpl(AtomicNumber n) {
+    public HCCounterImpl(IAtomicLong n) {
       this.n = n;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
index 7ad6ec6..e6275c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/LocalCache.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.cache;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +32,15 @@ import com.google.common.collect.Lists;
 import com.google.common.io.ByteArrayDataInput;
 import com.google.common.io.ByteArrayDataOutput;
 import com.google.common.io.ByteStreams;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.ExecProtos.PlanFragment;
 
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.server.DrillbitContext;
 
 public class LocalCache implements DistributedCache {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalCache.class);
@@ -44,7 +49,8 @@ public class LocalCache implements DistributedCache {
   private volatile ConcurrentMap<Class, DistributedMap> maps;
   private volatile ConcurrentMap<Class, DistributedMultiMap> multiMaps;
   private volatile ConcurrentMap<String, Counter> counters;
-  
+  private static final BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+
   @Override
   public void close() throws IOException {
     handles = null;
@@ -116,10 +122,10 @@ public class LocalCache implements DistributedCache {
   public static DrillSerializable deserialize(byte[] bytes, Class clazz) {
     ByteArrayDataInput in = ByteStreams.newDataInput(bytes);
     try {
-      DrillSerializable obj = (DrillSerializable)clazz.newInstance();
+      DrillSerializable obj = (DrillSerializable)clazz.getConstructor(BufferAllocator.class).newInstance(allocator);
       obj.read(in);
       return obj;
-    } catch (InstantiationException | IllegalAccessException | IOException e) {
+    } catch (InstantiationException | IllegalAccessException | IOException | NoSuchMethodException | InvocationTargetException e) {
       throw new RuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
index 4aea645..448eecd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/ProtoBufWrap.java
@@ -21,11 +21,13 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
 import org.apache.drill.exec.proto.ExecProtos.WorkQueueStatus;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
-import com.hazelcast.nio.DataSerializable;
+import com.hazelcast.nio.serialization.DataSerializable;
 
 public abstract class ProtoBufWrap<T extends MessageLite> implements DataSerializable{
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtoBufWrap.class);
@@ -43,7 +45,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
   }
   
   @Override
-  public void readData(DataInput arg0) throws IOException {
+  public void readData(ObjectDataInput arg0) throws IOException {
     int len = arg0.readShort();
     byte[] b = new byte[len];
     arg0.readFully(b);
@@ -51,7 +53,7 @@ public abstract class ProtoBufWrap<T extends MessageLite> implements DataSeriali
   }
 
   @Override
-  public void writeData(DataOutput arg0) throws IOException {
+  public void writeData(ObjectDataOutput arg0) throws IOException {
     byte[] b = value.toByteArray();
     if (b.length > Short.MAX_VALUE) throw new IOException("Unexpectedly long value.");
     arg0.writeShort(b.length);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
new file mode 100644
index 0000000..24387d8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.google.common.collect.Lists;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.util.DataInputInputStream;
+import org.apache.drill.common.util.DataOutputOutputStream;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
+
+import java.io.*;
+import java.util.List;
+
+public class VectorAccessibleSerializable implements DrillSerializable {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
+  static final MetricRegistry metrics = DrillMetrics.getInstance();
+  static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
+
+  private VectorAccessible va;
+  private BufferAllocator allocator;
+  private int recordCount = -1;
+  private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
+  private SelectionVector2 sv2;
+
+  /**
+   *
+   * @param va
+   */
+  public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
+    this.va = va;
+    this.allocator = allocator;
+  }
+
+  public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
+    this.va = va;
+    this.allocator = allocator;
+    this.sv2 = sv2;
+    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+  }
+
+  public VectorAccessibleSerializable(BufferAllocator allocator) {
+    this.va = new VectorContainer();
+    this.allocator = allocator;
+  }
+
+  @Override
+  public void read(DataInput input) throws IOException {
+    readFromStream(DataInputInputStream.constructInputStream(input));
+  }
+  
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    VectorContainer container = new VectorContainer();
+    UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+    recordCount = batchDef.getRecordCount();
+    if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+      sv2.allocateNew(recordCount * 2);
+      sv2.getBuffer().setBytes(0, input, recordCount * 2);
+      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
+    List<ValueVector> vectorList = Lists.newArrayList();
+    List<FieldMetadata> fieldList = batchDef.getFieldList();
+    for (FieldMetadata metaData : fieldList) {
+      int dataLength = metaData.getBufferLength();
+      byte[] bytes = new byte[dataLength];
+      input.read(bytes);
+      MaterializedField field = MaterializedField.create(metaData.getDef());
+      ByteBuf buf = allocator.buffer(dataLength);
+      buf.setBytes(0, bytes);
+      ValueVector vector = TypeHelper.getNewVector(field, allocator);
+      vector.load(metaData, buf);
+      vectorList.add(vector);
+    }
+    container.addCollection(vectorList);
+    container.buildSchema(svMode);
+    container.setRecordCount(recordCount);
+    va = container;
+  }
+
+  @Override
+  public void write(DataOutput output) throws IOException {
+    writeToStream(DataOutputOutputStream.constructOutputStream(output));
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
+
+    ByteBuf[] incomingBuffers = batch.getBuffers();
+    UserBitShared.RecordBatchDef batchDef = batch.getDef();
+
+        /* ByteBuf associated with the selection vector */
+    ByteBuf svBuf = null;
+
+        /* Size of the selection vector */
+    int svCount = 0;
+
+    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+    {
+      svCount = sv2.getCount();
+      svBuf = sv2.getBuffer();
+    }
+
+    int totalBufferLength = 0;
+
+    try
+    {
+            /* Write the metadata to the file */
+      batchDef.writeDelimitedTo(output);
+
+            /* If we have a selection vector, dump it to file first */
+      if (svBuf != null)
+      {
+
+                /* For writing to the selection vectors we use
+                 * setChar() method which does not modify the
+                 * reader and writer index. To copy the entire buffer
+                 * without having to get each byte individually we need
+                 * to set the writer index
+                 */
+        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
+
+//        fc.write(svBuf.nioBuffers());
+        svBuf.getBytes(0, output, svBuf.readableBytes());
+        svBuf.release();
+      }
+
+            /* Dump the array of ByteBuf's associated with the value vectors */
+      for (ByteBuf buf : incomingBuffers)
+      {
+                /* dump the buffer into the file channel */
+        int bufLength = buf.readableBytes();
+        buf.getBytes(0, output, bufLength);
+
+                /* compute total length of buffer, will be used when
+                 * we create a compound buffer
+                 */
+        totalBufferLength += buf.readableBytes();
+        buf.release();
+      }
+
+      output.flush();
+      context.stop();
+    } catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    } finally {
+      clear();
+    }
+  }
+
+  private void clear() {
+  }
+
+  public VectorAccessible get() {
+    return va;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
deleted file mode 100644
index 5813dd6..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.cache;
-
-import com.google.common.collect.Lists;
-import com.yammer.metrics.MetricRegistry;
-import com.yammer.metrics.Timer;
-import io.netty.buffer.ByteBuf;
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.util.DataInputInputStream;
-import org.apache.drill.common.util.DataOutputOutputStream;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.*;
-import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.vector.BaseDataValueVector;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
-
-import java.io.*;
-import java.util.List;
-
-public class VectorContainerSerializable implements DrillSerializable {
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
-  static final MetricRegistry metrics = DrillMetrics.getInstance();
-  static final String WRITER_TIMER = MetricRegistry.name(VectorContainerSerializable.class, "writerTime");
-
-  private VectorAccessible va;
-  private BootStrapContext context;
-  private int recordCount = -1;
-  private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
-  private SelectionVector2 sv2;
-
-  /**
-   *
-   * @param va
-   */
-  public VectorContainerSerializable(VectorAccessible va){
-    this.va = va;
-    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
-  }
-
-  public VectorContainerSerializable(VectorAccessible va, SelectionVector2 sv2, FragmentContext context) {
-    this.va = va;
-    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
-    this.sv2 = sv2;
-    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-  }
-
-  public VectorContainerSerializable() {
-    this.va = new VectorContainer();
-    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
-  }
-
-  @Override
-  public void read(DataInput input) throws IOException {
-    readFromStream(DataInputInputStream.constructInputStream(input));
-  }
-  
-  @Override
-  public void readFromStream(InputStream input) throws IOException {
-    VectorContainer container = new VectorContainer();
-    UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
-    recordCount = batchDef.getRecordCount();
-    if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
-      sv2.allocateNew(recordCount * 2);
-      sv2.getBuffer().setBytes(0, input, recordCount * 2);
-      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-    }
-    List<ValueVector> vectorList = Lists.newArrayList();
-    List<FieldMetadata> fieldList = batchDef.getFieldList();
-    for (FieldMetadata metaData : fieldList) {
-      int dataLength = metaData.getBufferLength();
-      byte[] bytes = new byte[dataLength];
-      input.read(bytes);
-      MaterializedField field = MaterializedField.create(metaData.getDef());
-      ByteBuf buf = context.getAllocator().buffer(dataLength);
-      buf.setBytes(0, bytes);
-      ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
-      vector.load(metaData, buf);
-      vectorList.add(vector);
-    }
-    container.addCollection(vectorList);
-    container.buildSchema(svMode);
-    container.setRecordCount(recordCount);
-    va = container;
-  }
-
-  @Override
-  public void write(DataOutput output) throws IOException {
-    writeToStream(DataOutputOutputStream.constructOutputStream(output));
-  }
-
-  @Override
-  public void writeToStream(OutputStream output) throws IOException {
-    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
-
-    ByteBuf[] incomingBuffers = batch.getBuffers();
-    UserBitShared.RecordBatchDef batchDef = batch.getDef();
-
-        /* ByteBuf associated with the selection vector */
-    ByteBuf svBuf = null;
-
-        /* Size of the selection vector */
-    int svCount = 0;
-
-    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
-    {
-      svCount = sv2.getCount();
-      svBuf = sv2.getBuffer();
-    }
-
-    int totalBufferLength = 0;
-
-    try
-    {
-            /* Write the metadata to the file */
-      batchDef.writeDelimitedTo(output);
-
-            /* If we have a selection vector, dump it to file first */
-      if (svBuf != null)
-      {
-
-                /* For writing to the selection vectors we use
-                 * setChar() method which does not modify the
-                 * reader and writer index. To copy the entire buffer
-                 * without having to get each byte individually we need
-                 * to set the writer index
-                 */
-        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-//        fc.write(svBuf.nioBuffers());
-        svBuf.getBytes(0, output, svBuf.readableBytes());
-        svBuf.release();
-      }
-
-            /* Dump the array of ByteBuf's associated with the value vectors */
-      for (ByteBuf buf : incomingBuffers)
-      {
-                /* dump the buffer into the file channel */
-        int bufLength = buf.readableBytes();
-        buf.getBytes(0, output, bufLength);
-
-                /* compute total length of buffer, will be used when
-                 * we create a compound buffer
-                 */
-        totalBufferLength += buf.readableBytes();
-        buf.release();
-      }
-
-      output.flush();
-      context.stop();
-    } catch (IOException e)
-    {
-      throw new RuntimeException(e);
-    } finally {
-      clear();
-    }
-  }
-
-  private void clear() {
-  }
-
-  public VectorAccessible get() {
-    return va;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index ef3886c..e39b82e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -82,7 +82,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
   private boolean startedUnsampledBatches = false;
   private boolean upstreamNone = false;
   private int recordCount;
-  private DistributedMap<VectorContainerSerializable> tableMap;
+  private DistributedMap<VectorAccessibleSerializable> tableMap;
   private DistributedMultiMap mmap;
   private String mapKey;
 
@@ -154,13 +154,13 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
       DistributedCache cache = context.getDrillbitContext().getCache();
       mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
-      mmap = cache.getMultiMap(VectorContainerSerializable.class);
+      mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
       List<ValueVector> vectorList = Lists.newArrayList();
       for (VectorWrapper vw : containerToCache) {
         vectorList.add(vw.getValueVector());
       }
 
-      VectorContainerSerializable wrap = new VectorContainerSerializable(containerToCache);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
 
       mmap.put(mapKey, wrap);
       wrap = null;
@@ -169,24 +169,24 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
       long val = minorFragmentSampleCount.incrementAndGet();
       logger.debug("Incremented mfsc, got {}", val);
-      tableMap = cache.getMap(VectorContainerSerializable.class);
+      tableMap = cache.getMap(VectorAccessibleSerializable.class);
       Preconditions.checkNotNull(tableMap);
 
       if (val == Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
         buildTable();
-        wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+        wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
       } else if (val < Math.ceil(sendingMajorFragmentWidth * completionFactor)) {
         // Wait until sufficient number of fragments have submitted samples, or proceed after 100 ms passed
         for (int i = 0; i < 100 && wrap == null; i++) {
           Thread.sleep(10);
-          wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+          wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
           if (i == 99) {
             buildTable();
-            wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+            wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
           }
         }
       } else {
-        wrap = (VectorContainerSerializable)tableMap.get(mapKey + "final");
+        wrap = (VectorAccessibleSerializable)tableMap.get(mapKey + "final");
       }
 
       Preconditions.checkState(wrap != null);
@@ -211,7 +211,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
 
     SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES, allSamplesContainer);
     for (DrillSerializable w : allSamplesWrap) {
-      containerBuilder.add(((VectorContainerSerializable)w).get());
+      containerBuilder.add(((VectorAccessibleSerializable)w).get());
     }
     containerBuilder.build(context);
 
@@ -239,7 +239,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     }
     candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
 
-    VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
 
     tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 052f415..49732d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -70,7 +70,7 @@ public class Drillbit implements Closeable{
   final DistributedCache cache;
   final WorkManager manager;
   final BootStrapContext context;
-  
+
   private volatile RegistrationHandle handle;
 
   public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
@@ -86,15 +86,15 @@ public class Drillbit implements Closeable{
       this.manager = new WorkManager(context);
       this.coord = new ZKClusterCoordinator(config);
       this.engine = new ServiceEngine(manager.getBitComWorker(), manager.getUserWorker(), context);
-      this.cache = new HazelCache(config);
+      this.cache = new HazelCache(config, context.getAllocator());
     }
   }
 
   public void run() throws Exception {
     coord.start(10000);
     DrillbitEndpoint md = engine.start();
-    cache.run();
     manager.start(md, cache, engine.getBitCom(), coord);
+    cache.run();
     handle = coord.register(md);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index ffc0274..94aa3dd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -46,7 +46,7 @@ public class TestVectorCache {
     Drillbit bit = new Drillbit(config, serviceSet);
     bit.run();
     DrillbitContext context = bit.getContext();
-    HazelCache cache = new HazelCache(config);
+    HazelCache cache = new HazelCache(config, context.getAllocator());
     cache.run();
 
     MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
@@ -68,11 +68,11 @@ public class TestVectorCache {
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
     container.setRecordCount(4);
-    VectorContainerSerializable wrap = new VectorContainerSerializable(container);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
 
-    DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
+    DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
     mmap.put("vectors", wrap);
-    VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
+    VectorAccessibleSerializable newWrap = (VectorAccessibleSerializable)mmap.get("vectors").iterator().next();
 
     VectorAccessible newContainer = newWrap.get();
     for (VectorWrapper w : newContainer) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d529352e/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
new file mode 100644
index 0000000..11d15d8
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.cache;
+
+import com.beust.jcommander.internal.Lists;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.IntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VarBinaryVector;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestWriteToDisk {
+
+  @Test
+  public void test() throws Exception {
+    List<ValueVector> vectorList = Lists.newArrayList();
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+    DrillConfig config = DrillConfig.create();
+    Drillbit bit = new Drillbit(config, serviceSet);
+    bit.run();
+    DrillbitContext context = bit.getContext();
+
+    MaterializedField intField = MaterializedField.create(new SchemaPath("int", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.INT));
+    IntVector intVector = (IntVector)TypeHelper.getNewVector(intField, context.getAllocator());
+    MaterializedField binField = MaterializedField.create(new SchemaPath("binary", ExpressionPosition.UNKNOWN), Types.required(TypeProtos.MinorType.VARBINARY));
+    VarBinaryVector binVector = (VarBinaryVector)TypeHelper.getNewVector(binField, context.getAllocator());
+    AllocationHelper.allocate(intVector, 4, 4);
+    AllocationHelper.allocate(binVector, 4, 5);
+    vectorList.add(intVector);
+    vectorList.add(binVector);
+
+    intVector.getMutator().set(0, 0); binVector.getMutator().set(0, "ZERO".getBytes());
+    intVector.getMutator().set(1, 1); binVector.getMutator().set(1, "ONE".getBytes());
+    intVector.getMutator().set(2, 2); binVector.getMutator().set(2, "TWO".getBytes());
+    intVector.getMutator().set(3, 3); binVector.getMutator().set(3, "THREE".getBytes());
+    intVector.getMutator().setValueCount(4);
+    binVector.getMutator().setValueCount(4);
+
+    VectorContainer container = new VectorContainer();
+    container.addCollection(vectorList);
+    container.setRecordCount(4);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+
+    Configuration conf = new Configuration();
+    conf.set("fs.name.default", "file:///");
+    FileSystem fs = FileSystem.get(conf);
+    Path path = new Path("/tmp/drillSerializable");
+    if (fs.exists(path)) fs.delete(path, false);
+    FSDataOutputStream out = fs.create(path);
+
+    wrap.writeToStream(out);
+    out.close();
+
+    FSDataInputStream in = fs.open(path);
+    VectorAccessibleSerializable newWrap = new VectorAccessibleSerializable(context.getAllocator());
+    newWrap.readFromStream(in);
+    fs.close();
+
+    VectorAccessible newContainer = newWrap.get();
+    for (VectorWrapper w : newContainer) {
+      ValueVector vv = w.getValueVector();
+      int values = vv.getAccessor().getValueCount();
+      for (int i = 0; i < values; i++) {
+        Object o = vv.getAccessor().getObject(i);
+        if (o instanceof byte[]) {
+          System.out.println(new String((byte[])o));
+        } else {
+          System.out.println(o);
+        }
+      }
+    }
+  }
+}


[08/10] git commit: Simple reformatting

Posted by ja...@apache.org.
Simple reformatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8a571b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8a571b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8a571b3e

Branch: refs/heads/master
Commit: 8a571b3ea04f439b3303d7154b1450abcb2bf320
Parents: b44b6c7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri Nov 8 09:20:28 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 09:20:28 2013 -0800

----------------------------------------------------------------------
 .../physical/impl/trace/TraceBatchCreator.java  |  16 +-
 .../physical/impl/trace/TraceRecordBatch.java   | 224 +++++++++----------
 .../apache/drill/exec/record/WritableBatch.java |  52 +++--
 3 files changed, 138 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index e857c25..a24ec70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -29,13 +29,13 @@ import com.google.common.base.Preconditions;
 import java.util.List;
 
 public class TraceBatchCreator implements BatchCreator<Trace> {
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
-
-    @Override
-    public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
-        //Preconditions.checkArgument(children.size() == 1);
-        return new TraceRecordBatch(config, children.iterator().next(), context);
-    }
-
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
+
+  @Override
+  public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+      throws ExecutionSetupException {
+    // Preconditions.checkArgument(children.size() == 1);
+    return new TraceRecordBatch(config, children.iterator().next(), context);
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index b73ddc1..1b990c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -65,139 +65,125 @@ import org.apache.hadoop.fs.Path;
  * same set of value vectors (and selection vectors) to its parent record
  * batch
  */
-public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
-{
-    static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
-
-    private SelectionVector2 sv = null;
-
-    /* Tag associated with each trace operator */
-    final String traceTag;
-
-    /* Location where the log should be dumped */
-    private final String logLocation;
-
-    /* File descriptors needed to be able to dump to log file */
-    private OutputStream fos;
-
-    public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
-    {
-        super(pop, context, incoming);
-        this.traceTag = pop.traceTag;
-        logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
-
-        String fileName = getFileName();
-
-        /* Create the log file we will dump to and initialize the file descriptors */
-        try
-        {
-          Configuration conf = new Configuration();
-          conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
-          FileSystem fs = FileSystem.get(conf);
-
-            /* create the file */
-          fos = fs.create(new Path(fileName));
-        } catch (IOException e)
-        {
-            logger.error("Unable to create file: " + fileName);
-        }
-    }
+public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+
+  private SelectionVector2 sv = null;
+
+  /* Tag associated with each trace operator */
+  final String traceTag;
+
+  /* Location where the log should be dumped */
+  private final String logLocation;
+
+  /* File descriptors needed to be able to dump to log file */
+  private OutputStream fos;
+
+  public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) {
+    super(pop, context, incoming);
+    this.traceTag = pop.traceTag;
+    logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+    String fileName = getFileName();
+
+    /* Create the log file we will dump to and initialize the file descriptors */
+    try {
+      Configuration conf = new Configuration();
+      conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
+      FileSystem fs = FileSystem.get(conf);
 
-    @Override
-    public int getRecordCount()
-    {
-        if (sv == null)
-            return incoming.getRecordCount();
-        else
-           return sv.getCount();
+      /* create the file */
+      fos = fs.create(new Path(fileName));
+    } catch (IOException e) {
+      logger.error("Unable to create file: " + fileName);
     }
+  }
+
+  @Override
+  public int getRecordCount() {
+    if (sv == null)
+      return incoming.getRecordCount();
+    else
+      return sv.getCount();
+  }
+
+  /**
+   * Function is invoked for every record batch and it simply dumps the buffers associated with all the value vectors in
+   * this record batch to a log file.
+   */
+  @Override
+  protected void doWork() {
+
+    boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+    if (incomingHasSv2) {
+      sv = incoming.getSelectionVector2();
+    } else {
+      sv = null;
+    }
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true
+        : false);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
+
+    try {
+      wrap.writeToStreamAndRetain(fos);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    batch.reconstructContainer(container);
+  }
 
-    /**
-     * Function is invoked for every record batch and it simply
-     * dumps the buffers associated with all the value vectors in
-     * this record batch to a log file.
+  @Override
+  protected void setupNewSchema() throws SchemaChangeException {
+    /* Trace operator does not deal with hyper vectors yet */
+    if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+      throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+    /*
+     * we have a new schema, clear our existing container to load the new value vectors
      */
-    @Override
-    protected void doWork()
-    {
-
-      boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
-      if (incomingHasSv2) {
-        sv = incoming.getSelectionVector2();
-      } else {
-        sv = null;
-      }
-      WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
-              incoming, incomingHasSv2 ? true : false);
-      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
-
-      try {
-        wrap.writeToStreamAndRetain(fos);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      batch.reconstructContainer(container);
-    }
+    container.clear();
 
-    @Override
-    protected void setupNewSchema() throws SchemaChangeException
-    {
-        /* Trace operator does not deal with hyper vectors yet */
-        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
-            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
-        /* we have a new schema, clear our existing container to
-         * load the new value vectors
-         */
-        container.clear();
-
-        /* Add all the value vectors in the container */
-        for(VectorWrapper<?> vv : incoming)
-        {
-            TransferPair tp = vv.getValueVector().getTransferPair();
-            container.add(tp.getTo());
-        }
+    /* Add all the value vectors in the container */
+    for (VectorWrapper<?> vv : incoming) {
+      TransferPair tp = vv.getValueVector().getTransferPair();
+      container.add(tp.getTo());
     }
+  }
 
-    @Override
-    public SelectionVector2 getSelectionVector2() {
-        return sv;
-    }
+  @Override
+  public SelectionVector2 getSelectionVector2() {
+    return sv;
+  }
 
-    private String getFileName()
-    {
-        /* From the context, get the query id, major fragment id,
-         * minor fragment id. This will be used as the file name
-         * to which we will dump the incoming buffer data
-         */
-        FragmentHandle handle = incoming.getContext().getHandle();
+  private String getFileName() {
+    /*
+     * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
+     * which we will dump the incoming buffer data
+     */
+    FragmentHandle handle = incoming.getContext().getHandle();
 
-        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+    String qid = QueryIdHelper.getQueryId(handle.getQueryId());
 
-        int majorFragmentId = handle.getMajorFragmentId();
-        int minorFragmentId = handle.getMinorFragmentId();
+    int majorFragmentId = handle.getMajorFragmentId();
+    int minorFragmentId = handle.getMinorFragmentId();
 
-        String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+    String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
 
-        return fileName;
-    }
+    return fileName;
+  }
 
+  @Override
+  protected void cleanup() {
+    /* Release the selection vector */
+    if (sv != null)
+      sv.clear();
 
-    @Override
-    protected void cleanup()
-    {
-        /* Release the selection vector */
-        if (sv != null)
-          sv.clear();
-
-        /* Close the file descriptors */
-        try
-        {
-            fos.close();
-        } catch (IOException e)
-        {
-            logger.error("Unable to close file descriptors for file: " + getFileName());
-        }
+    /* Close the file descriptors */
+    try {
+      fos.close();
+    } catch (IOException e) {
+      logger.error("Unable to close file descriptors for file: " + getFileName());
     }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index a33ca37..e9b56db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -60,17 +60,15 @@ public class WritableBatch {
     return buffers;
   }
 
-  public void reconstructContainer(VectorContainer container)
-  {
-    Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
-    if (buffers.length > 0)    /* If we have ByteBuf's associated with value vectors */
-    {
-
+  public void reconstructContainer(VectorContainer container) {
+    Preconditions.checkState(!cleared,
+        "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+    if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
+      
       CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
 
-            /* Copy data from each buffer into the compound buffer */
-      for (ByteBuf buf : buffers)
-      {
+      /* Copy data from each buffer into the compound buffer */
+      for (ByteBuf buf : buffers) {
         cbb.addComponent(buf);
       }
 
@@ -78,13 +76,12 @@ public class WritableBatch {
 
       int bufferOffset = 0;
 
-            /* For each value vector slice up the appropriate size from
-             * the compound buffer and load it into the value vector
-             */
+      /*
+       * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
+       */
       int vectorIndex = 0;
 
-      for(VectorWrapper<?> vv : container)
-      {
+      for (VectorWrapper<?> vv : container) {
         FieldMetadata fmd = fields.get(vectorIndex);
         ValueVector v = vv.getValueVector();
         v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -101,9 +98,8 @@ public class WritableBatch {
     }
     container.buildSchema(svMode);
 
-        /* Set the record count in the value vector */
-    for(VectorWrapper<?> v : container)
-    {
+    /* Set the record count in the value vector */
+    for (VectorWrapper<?> v : container) {
       ValueVector.Mutator m = v.getValueVector().getMutator();
       m.setValueCount(def.getRecordCount());
     }
@@ -118,23 +114,24 @@ public class WritableBatch {
 
   public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
     List<ValueVector> vectors = Lists.newArrayList();
-    for(VectorWrapper<?> vw : vws){
+    for (VectorWrapper<?> vw : vws) {
       Preconditions.checkArgument(!vw.isHyper());
       vectors.add(vw.getValueVector());
     }
     return getBatchNoHV(recordCount, vectors, isSV2);
   }
-  
+
   public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
     List<ByteBuf> buffers = Lists.newArrayList();
     List<FieldMetadata> metadata = Lists.newArrayList();
 
     for (ValueVector vv : vectors) {
       metadata.add(vv.getMetadata());
-      
-      // don't try to get the buffers if we don't have any records.  It is possible the buffers are dead buffers.
-      if(recordCount == 0) continue;
-      
+
+      // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
+      if (recordCount == 0)
+        continue;
+
       for (ByteBuf b : vv.getBuffers()) {
         buffers.add(b);
       }
@@ -142,14 +139,15 @@ public class WritableBatch {
       vv.clear();
     }
 
-    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).setIsSelectionVector2(isSV2).build();
+    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
+        .setIsSelectionVector2(isSV2).build();
     WritableBatch b = new WritableBatch(batchDef, buffers);
     return b;
   }
-  
+
   public static WritableBatch get(RecordBatch batch) {
-    if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
-        throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
+    if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+      throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
 
     boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
     return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);


[09/10] git commit: Drill-278: fix for Join predicate with CHAR types hit run-time error

Posted by ja...@apache.org.
Drill-278: fix for Join predicate with CHAR types hit run-time error


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c5916653
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c5916653
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c5916653

Branch: refs/heads/master
Commit: c591665343b7640d82397f5668f98b54c9e789a9
Parents: 8a571b3
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Fri Nov 8 09:23:22 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 09:23:22 2013 -0800

----------------------------------------------------------------------
 .../drill/exec/physical/impl/join/MergeJoinBatch.java       | 9 ++-------
 1 file changed, 2 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c5916653/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1f5c707..1e20e91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -340,13 +340,8 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
     }
 
     // check value equality
-    cg.getEvalBlock()._if(compareThisLeftExprHolder.getValue().eq(compareNextLeftExprHolder.getValue()))
-                     ._then()
-                       ._return(JExpr.lit(0));
-
-    // no match if reached
-    cg.getEvalBlock()._return(JExpr.lit(1));
-
+    FunctionCall g = new FunctionCall(ComparatorFunctions.COMPARE_TO, ImmutableList.of((LogicalExpression) new HoldingContainerExpression(compareThisLeftExprHolder), (LogicalExpression)  new HoldingContainerExpression(compareNextLeftExprHolder)), ExpressionPosition.UNKNOWN);
+    cg.addExpr(new ReturnValueExpression(g, false), false);
 
     // generate copyLeft()
     //////////////////////


[10/10] git commit: DRILL-266:Build tools to interpret the output dumped by the diagnostic operator.

Posted by ja...@apache.org.
DRILL-266:Build tools to interpret the output dumped by the diagnostic operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c287fa60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c287fa60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c287fa60

Branch: refs/heads/master
Commit: c287fa604f6206752200d72359d3293c87078010
Parents: c591665
Author: Jinfeng Ni <jn...@maprtech.com>
Authored: Fri Nov 8 16:50:45 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 16:50:45 2013 -0800

----------------------------------------------------------------------
 distribution/src/assemble/bin.xml               |   4 +
 distribution/src/resources/drill_dumpcat        |  56 ++++
 .../cache/VectorAccessibleSerializable.java     |   3 +-
 .../org/apache/drill/exec/client/DumpCat.java   | 292 +++++++++++++++++++
 .../drill/exec/client/QuerySubmitter.java       |  33 +--
 .../org/apache/drill/exec/util/VectorUtil.java  |  68 +++++
 .../apache/drill/exec/client/DumpCatTest.java   | 133 +++++++++
 7 files changed, 561 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index 26de847..fdd6c70 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -110,6 +110,10 @@
       <outputDirectory>bin</outputDirectory>
     </file>
     <file>
+      <source>src/resources/drill_dumpcat</source>
+      <outputDirectory>bin</outputDirectory>
+    </file>
+    <file>
       <source>src/resources/drill-override.conf</source>
       <outputDirectory>conf</outputDirectory>
     </file>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/distribution/src/resources/drill_dumpcat
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill_dumpcat b/distribution/src/resources/drill_dumpcat
new file mode 100755
index 0000000..1747c9a
--- /dev/null
+++ b/distribution/src/resources/drill_dumpcat
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin">/dev/null; pwd`
+
+. "$bin"/drill-config.sh
+
+if [ -z $JAVA_HOME ]
+then
+  JAVA=`which java`
+else
+  JAVA=`find -L $JAVA_HOME -name java | head -n 1`
+fi
+
+if [ -e $JAVA ]; then
+  echo ""
+else
+  echo "Java not found."
+  exit 1
+fi
+
+$JAVA -version 2>&1 | grep "version" | egrep -e "1.7" > /dev/null
+if [ $? -ne 0 ]; then
+  echo "Java 1.7 is required to run Apache Drill."
+  exit 1
+fi
+
+# get log directory
+if [ "$DRILL_LOG_DIR" = "" ]; then
+  export DRILL_LOG_DIR=/var/log/drill
+fi
+
+CP=$DRILL_HOME/jars/*:$CP
+CP=$DRILL_HOME/lib/*:$CP
+
+CP=$DRILL_CONF_DIR:$CP
+CP=$HADOOP_CLASSPATH:$CP
+
+DRILL_SHELL_JAVA_OPTS="$DRILL_SHELL_JAVA_OPTS -Dlog.path=$DRILL_LOG_DIR/drill_dumpcat.log"
+
+exec $JAVA $DRILL_SHELL_JAVA_OPTS $DRILL_JAVA_OPTS -cp $CP org.apache.drill.exec.client.DumpCat $@

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index e5bb94b..7b4bc23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -98,6 +98,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCount();
     if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+
       if (sv2 == null) {
         sv2 = new SelectionVector2(allocator);
       }
@@ -197,7 +198,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   public VectorAccessible get() {
     return va;
   }
-
+  
   public SelectionVector2 getSv2() {
     return sv2;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
new file mode 100644
index 0000000..ef0b1e1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DumpCat.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.client;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.util.VectorUtil;
+
+import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.internal.Lists;
+
+public class DumpCat {
+
+  private final static BufferAllocator allocator = BufferAllocator.getAllocator(DrillConfig.create());
+
+  public static void main(String args[]) throws Exception {
+    DumpCat dumpCat = new DumpCat();
+
+    Options o = new Options();
+    JCommander jc = null;
+    try {
+      jc = new JCommander(o, args);
+      jc.setProgramName("./drill_dumpcat");
+    } catch (ParameterException e) {
+      System.out.println(e.getMessage());
+      String[] valid = {"-f", "file"};
+      new JCommander(o, valid).usage();
+      jc.usage();
+      System.exit(-1);
+    }
+    if (o.help) {
+      jc.usage();
+      System.exit(0);
+    }
+
+    /*Check if dump file exists*/
+    File file = new File(o.location);
+    if (!file.exists()) {
+      System.out.println(String.format("Trace file %s not created", o.location));
+      System.exit(-1);
+    }
+ 
+    FileInputStream input = new FileInputStream(file.getAbsoluteFile());
+
+    if (o.batch < 0) {
+      dumpCat.doQuery(input);
+    } else {
+      dumpCat.doBatch(input, o.batch, o.include_headers);
+    }
+
+    input.close();
+  }
+
+  /**
+   * Used to ensure the param "batch" is a non-negative number.
+   */
+  public static class BatchNumValidator implements IParameterValidator {
+    @Override
+    public void validate(String name, String value) throws ParameterException {
+      try {
+        int batch = Integer.parseInt(value);
+        if(batch < 0) {
+          throw new ParameterException("Parameter " + name + " should be non-negative number.");
+        }
+      } catch (NumberFormatException e) {
+        throw new ParameterException("Parameter " + name + " should be non-negative number.");
+      }
+
+    }
+  }
+
+  /**
+   *  Options as input to JCommander.
+   */
+  static class Options {
+    @Parameter(names = {"-f"}, description = "file containing dump", required=true)
+    public String location = null;
+
+    @Parameter(names = {"-batch"}, description = "id of batch to show", required=false, validateWith = BatchNumValidator.class)
+    public int batch = -1;
+
+    @Parameter(names = {"-include-headers"}, description = "whether include header of batch", required=false)
+    public boolean include_headers = false;
+
+    @Parameter(names = {"-h", "-help", "--help"}, description = "show usage", help=true)
+    public boolean help = false;
+   }
+
+  /**
+   * Contains : # of rows, # of selected rows, data size (byte #).
+   */
+  private class BatchMetaInfo {
+    private long rows = 0;
+    private long selectedRows = 0;
+    private long dataSize = 0;
+
+    public BatchMetaInfo () {
+    }
+
+    public BatchMetaInfo (long rows, long selectedRows, long dataSize) {
+      this.rows = rows;
+      this.selectedRows = selectedRows;
+      this.dataSize = dataSize;
+    }
+
+    public void add(BatchMetaInfo info2) {
+      this.rows += info2.rows;
+      this.selectedRows += info2.selectedRows;
+      this.dataSize += info2.dataSize;
+    }
+
+    public String toString() {
+      String avgRecSizeStr = null;
+      if (this.rows>0)
+        avgRecSizeStr = String.format("Average Record Size : %d ", this.dataSize/this.rows);
+      else 
+        avgRecSizeStr = "Average Record Size : 0";
+      
+      return String.format("Records : %d / %d \n", this.selectedRows, this.rows) +
+             avgRecSizeStr + 
+             String.format("\n Total Data Size : %d", this.dataSize);
+    }
+  }
+
+  /**
+   * Querymode:
+   * $drill-dumpcat --file=local:///tmp/drilltrace/[queryid]_[tag]_[majorid]_[minor]_[operator]
+   *   Batches: 135
+   *   Records: 53,214/53,214 // the first one is the selected records.  The second number is the total number of records.
+   *   Selected Records: 53,214
+   *   Average Record Size: 74 bytes
+   *   Total Data Size: 12,345 bytes
+   *   Number of Empty Batches: 1
+   *   Schema changes: 1
+   *   Schema change batch indices: 0
+   * @throws Exception
+   */
+  protected void doQuery(FileInputStream input) throws Exception{
+    int  batchNum = 0;
+    int  emptyBatchNum = 0;
+    BatchSchema prevSchema = null;
+    List<Integer> schemaChangeIdx = Lists.newArrayList();
+
+    BatchMetaInfo aggBatchMetaInfo = new BatchMetaInfo();
+
+    while (input.available() > 0) {
+      VectorAccessibleSerializable vcSerializable = new VectorAccessibleSerializable(DumpCat.allocator);
+      vcSerializable.readFromStream(input);
+       VectorContainer vectorContainer = (VectorContainer) vcSerializable.get();
+
+       aggBatchMetaInfo.add(getBatchMetaInfo(vcSerializable));
+
+       if (vectorContainer.getRecordCount() == 0) {
+         emptyBatchNum ++;
+       }
+
+       if (prevSchema != null && !vectorContainer.getSchema().equals(prevSchema))
+         schemaChangeIdx.add(batchNum);
+
+       prevSchema = vectorContainer.getSchema();
+       batchNum ++;
+ 
+       vectorContainer.zeroVectors();
+    }
+
+     /* output the summary stat */
+     System.out.println(String.format("Total # of batches: %d", batchNum));
+     //output: rows, selectedRows, avg rec size, total data size.
+     System.out.println(aggBatchMetaInfo.toString());
+     System.out.println(String.format("Empty batch : %d", emptyBatchNum));
+    System.out.println(String.format("Schema changes : %d", schemaChangeIdx.size()));
+    System.out.println(String.format("Schema change batch index : %s", schemaChangeIdx.toString()));
+  }
+
+  /**
+   * Batch mode:
+   * $drill-dumpcat --file=local:///tmp/drilltrace/[queryid]_[tag]_[majorid]_[minor]_[operator] --batch=123 --include-headers=true
+   * Records: 1/1
+   * Average Record Size: 8 bytes
+   * Total Data Size: 8 bytes
+   * Schema Information
+   * name: col1, minor_type: int4, data_mode: nullable
+   * name: col2, minor_type: int4, data_mode: non-nullable
+   * @param targetBatchNum
+   * @throws Exception
+   */
+  protected void doBatch(FileInputStream input, int targetBatchNum, boolean showHeader) throws Exception {
+    int batchNum = -1;
+
+    VectorAccessibleSerializable vcSerializable = null;
+
+    while (input.available() > 0 && batchNum ++ < targetBatchNum) {
+      vcSerializable = new VectorAccessibleSerializable(DumpCat.allocator);
+      vcSerializable.readFromStream(input);
+
+      if (batchNum != targetBatchNum) {
+        VectorContainer vectorContainer = (VectorContainer) vcSerializable.get();
+        vectorContainer.zeroVectors();
+      }
+    }
+
+    if (batchNum < targetBatchNum) {
+      System.out.println(String.format("Wrong input of batch # ! Total # of batch in the file is %d. Please input a number 0..%d as batch #", batchNum+1, batchNum));
+      input.close();
+      System.exit(-1);
+    }
+
+    if (vcSerializable != null) {
+      showSingleBatch(vcSerializable, showHeader);
+      VectorContainer vectorContainer = (VectorContainer) vcSerializable.get();
+      vectorContainer.zeroVectors();
+    }
+  }
+
+
+  private void showSingleBatch (VectorAccessibleSerializable vcSerializable, boolean showHeader) {
+    VectorContainer vectorContainer = (VectorContainer)vcSerializable.get();
+
+    /* show the header of the batch */
+    if (showHeader) {
+      System.out.println(getBatchMetaInfo(vcSerializable).toString());
+
+      System.out.println("Schema Information");
+      for (VectorWrapper w : vectorContainer) {
+        MaterializedField field = w.getValueVector().getField();
+        System.out.println (String.format("name : %s, minor_type : %s, data_mode : %s",
+                                          field.getName(),
+                                          field.getType().getMinorType().toString(),
+                                          field.isNullable() ? "nullable":"non-nullable"
+                          ));
+      }
+    }
+
+    /* show the contents in the batch */
+    VectorUtil.showVectorAccessibleContent(vectorContainer);
+  }
+
+
+  /* Get batch meta info : rows, selectedRows, dataSize */
+  private BatchMetaInfo getBatchMetaInfo(VectorAccessibleSerializable vcSerializable) {
+    VectorAccessible vectorContainer = vcSerializable.get();
+
+    int rows =0;
+    int selectedRows = 0;
+    int totalDataSize = 0;
+
+    rows = vectorContainer.getRecordCount();
+    selectedRows = rows;
+
+    if (vectorContainer.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
+      selectedRows = vcSerializable.getSv2().getCount();
+    }
+
+    for (VectorWrapper w : vectorContainer) {
+       totalDataSize += w.getValueVector().getBufferSize();
+    }
+
+    return new   BatchMetaInfo(rows, selectedRows, totalDataSize);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index c34ab1f..554fad0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -25,6 +25,7 @@ import com.beust.jcommander.internal.Lists;
 import com.google.common.base.Charsets;
 import com.google.common.base.Stopwatch;
 import com.google.common.io.Resources;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -40,6 +41,7 @@ import org.apache.drill.exec.rpc.user.UserResultsListener;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.ValueVector;
 
 import java.io.IOException;
@@ -156,35 +158,12 @@ public class QuerySubmitter {
         } catch (SchemaChangeException e) {
           submissionFailed(new RpcException(e));
         }
-        List<String> columns = Lists.newArrayList();
-        for (VectorWrapper vw : loader) {
-          columns.add(vw.getValueVector().getField().getName());
-        }
-        width = columns.size();
-        for (int row = 0; row < rows; row++) {
-          if (row%50 == 0) {
-            System.out.println(StringUtils.repeat("-", width*17 + 1));
-            for (String column : columns) {
-              System.out.printf("| %-15s", width <= 15 ? column : column.substring(0, 14));
-            }
-            System.out.printf("|\n");
-            System.out.println(StringUtils.repeat("-", width*17 + 1));
-          }
-          for (VectorWrapper vw : loader) {
-            Object o = vw.getValueVector().getAccessor().getObject(row);
-            if (o instanceof byte[]) {
-              String value = new String((byte[]) o);
-              System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
-            } else {
-              String value = o.toString();
-              System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
-            }
-          }
-          System.out.printf("|\n");
-        }
+        
+        VectorUtil.showVectorAccessibleContent(loader);
       }
+      
       if (result.getHeader().getIsLastChunk()) {
-        System.out.println(StringUtils.repeat("-", width*17 + 1));
+        //System.out.println(StringUtils.repeat("-", width*17 + 1));
         latch.countDown();
       }
       result.release();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
new file mode 100644
index 0000000..8b23bcd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/VectorUtil.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.util;
+
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
+
+import com.beust.jcommander.internal.Lists;
+
+public class VectorUtil {
+
+  public static void showVectorAccessibleContent(VectorAccessible va) {
+
+    int rows = va.getRecordCount();
+    List<String> columns = Lists.newArrayList();
+    for (VectorWrapper vw : va) {
+      columns.add(vw.getValueVector().getField().getName());
+    }
+
+    int width = columns.size();
+    for (int row = 0; row < rows; row++) {
+      if (row%50 == 0) {
+        System.out.println(StringUtils.repeat("-", width*17 + 1));
+        for (String column : columns) {
+          System.out.printf("| %-15s", column.length() <= 15 ? column : column.substring(0, 14));
+        }
+        System.out.printf("|\n");
+        System.out.println(StringUtils.repeat("-", width*17 + 1));
+      }
+      for (VectorWrapper vw : va) {
+        Object o = vw.getValueVector().getAccessor().getObject(row);
+        if (o instanceof byte[]) {
+          String value = new String((byte[]) o);
+          System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0, 14));
+        } else {
+          String value = o.toString();
+          System.out.printf("| %-15s",value.length() <= 15 ? value : value.substring(0,14));
+        }
+      }
+      System.out.printf("|\n");
+    }
+
+    if (rows > 0 )
+      System.out.println(StringUtils.repeat("-", width*17 + 1));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c287fa60/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
new file mode 100644
index 0000000..7a9784b
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.client;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.FileInputStream;
+
+import mockit.Injectable;
+import mockit.NonStrictExpectations;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.ExecConstants;
+
+import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
+import org.apache.drill.exec.physical.impl.SimpleRootExec;
+
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+
+import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import com.yammer.metrics.MetricRegistry;
+
+/**
+ * The unit test case will read a physical plan in json format. The physical plan contains a "trace" operator,
+ * which will produce a dump file.  The dump file will be input into DumpCat to test query mode and batch mode.
+ */
+
+public class DumpCatTest {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DumpCatTest.class);
+  DrillConfig c = DrillConfig.create();
+ 
+  @Test
+  public void testDumpCat(@Injectable final DrillbitContext bitContext, @Injectable UserClientConnection connection) throws Throwable
+  {
+
+      new NonStrictExpectations(){{
+          bitContext.getMetrics(); result = new MetricRegistry("test");
+          bitContext.getAllocator(); result = BufferAllocator.getAllocator(c);
+          bitContext.getConfig(); result = c;
+      }};
+
+      PhysicalPlanReader reader = new PhysicalPlanReader(c, c.getMapper(), CoordinationProtos.DrillbitEndpoint.getDefaultInstance());
+      PhysicalPlan plan = reader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile("/trace/simple_trace.json"), Charsets.UTF_8));
+      FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
+      FragmentContext context = new FragmentContext(bitContext, FragmentHandle.getDefaultInstance(), connection, null, registry);
+      SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next()));
+
+      while(exec.next()){
+      }
+
+      if(context.getFailureCause() != null){
+          throw context.getFailureCause();
+      }
+      assertTrue(!context.isFailed());
+
+      FragmentHandle handle = context.getHandle();
+
+      /* Form the file name to which the trace output will dump the record batches */
+      String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+      int majorFragmentId = handle.getMajorFragmentId();
+      int minorFragmentId = handle.getMinorFragmentId();
+
+      String logLocation = c.getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+      System.out.println("Found log location: " + logLocation);
+
+      String filename = String.format("%s//%s_%d_%d_mock-scan", logLocation, qid, majorFragmentId, minorFragmentId);
+
+      System.out.println("File Name: " + filename);
+
+      Configuration conf = new Configuration();
+      conf.set("fs.name.default", c.getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
+
+      FileSystem fs = FileSystem.get(conf);
+      Path path = new Path(filename);
+      assertTrue("Trace file does not exist", fs.exists(path));
+ 
+      DumpCat dumpCat = new DumpCat();
+ 
+      //Test Query mode
+      FileInputStream input = new FileInputStream(filename);
+ 
+      dumpCat.doQuery(input);
+      input.close();
+
+      //Test Batch mode
+      input = new FileInputStream(filename);
+      dumpCat.doBatch(input,0,true);
+
+      input.close();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception{
+      // pause to get logger to catch up.
+      Thread.sleep(1000);
+  }
+}


[06/10] git commit: DRILL-271 abstract out serialization in trace record batch by using VectorAccessibleSerializable. Add ability to retain vectors.

Posted by ja...@apache.org.
DRILL-271 abstract out serialization in trace record batch by using VectorAccessibleSerializable. Add ability to retain vectors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0ac0b19b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0ac0b19b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0ac0b19b

Branch: refs/heads/master
Commit: 0ac0b19b4bd986e6323799ce54453336821a17f7
Parents: 90c302d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Oct 30 18:22:13 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:46 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |   1 +
 .../cache/VectorAccessibleSerializable.java     | 105 +++++++++++-
 .../physical/impl/trace/TraceRecordBatch.java   | 168 +++----------------
 .../exec/record/AbstractSingleRecordBatch.java  |   1 +
 .../src/main/resources/drill-module.conf        |   3 +-
 .../impl/trace/TestTraceOutputDump.java         |  59 +++----
 6 files changed, 147 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 3aec702..36504f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -44,4 +44,5 @@ public interface ExecConstants {
   public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
   public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
   public static final String TRACE_DUMP_DIRECTORY = "drill.exec.trace.directory";
+  public static final String TRACE_DUMP_FILESYSTEM = "drill.exec.trace.filesystem";
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index f4a6998..62f8097 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -17,15 +17,18 @@
  */
 package org.apache.drill.exec.cache;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.yammer.metrics.MetricRegistry;
 import com.yammer.metrics.Timer;
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -46,6 +49,11 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
+  private int incomingRecordCount;
+  private VectorContainer retainedVectorContainer;
+  private SelectionVector2 retainedSelectionVector;
+
+  private boolean retain = false;
 
   /**
    *
@@ -54,6 +62,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
   public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
     this.va = va;
     this.allocator = allocator;
+    incomingRecordCount = va.getRecordCount();
   }
 
   public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
@@ -61,6 +70,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     this.allocator = allocator;
     this.sv2 = sv2;
     if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    incomingRecordCount = va.getRecordCount();
   }
 
   public VectorAccessibleSerializable(BufferAllocator allocator) {
@@ -108,7 +118,8 @@ public class VectorAccessibleSerializable implements DrillSerializable {
 
   @Override
   public void writeToStream(OutputStream output) throws IOException {
-    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+    Preconditions.checkNotNull(output);
+    final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
     WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
 
     ByteBuf[] incomingBuffers = batch.getBuffers();
@@ -147,7 +158,9 @@ public class VectorAccessibleSerializable implements DrillSerializable {
 
 //        fc.write(svBuf.nioBuffers());
         svBuf.getBytes(0, output, svBuf.readableBytes());
-        svBuf.release();
+        if (!retain) {
+          svBuf.release();
+        }
       }
 
             /* Dump the array of ByteBuf's associated with the value vectors */
@@ -161,11 +174,17 @@ public class VectorAccessibleSerializable implements DrillSerializable {
                  * we create a compound buffer
                  */
         totalBufferLength += buf.readableBytes();
-        buf.release();
+        if (!retain) {
+          buf.release();
+        }
       }
 
       output.flush();
-      context.stop();
+      if (retain) {
+        reconstructRecordBatch(batchDef, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+      }
+
+      timerContext.stop();
     } catch (IOException e)
     {
       throw new RuntimeException(e);
@@ -174,10 +193,88 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     }
   }
 
+  private void reconstructRecordBatch(UserBitShared.RecordBatchDef batchDef,
+                                      ByteBuf[] vvBufs, int totalBufferLength,
+                                      ByteBuf svBuf, int svCount, BatchSchema.SelectionVectorMode svMode)
+  {
+    VectorContainer container = retainedVectorContainer;
+    if (vvBufs.length > 0)    /* If we have ByteBuf's associated with value vectors */
+    {
+
+      CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
+
+            /* Copy data from each buffer into the compound buffer */
+      for (int i = 0; i < vvBufs.length; i++)
+      {
+        cbb.addComponent(vvBufs[i]);
+      }
+
+      List<FieldMetadata> fields = batchDef.getFieldList();
+
+      int bufferOffset = 0;
+
+            /* For each value vector slice up the appropriate size from
+             * the compound buffer and load it into the value vector
+             */
+      int vectorIndex = 0;
+
+      for(VectorWrapper<?> vv : container)
+      {
+        FieldMetadata fmd = fields.get(vectorIndex);
+        ValueVector v = vv.getValueVector();
+        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+        vectorIndex++;
+        bufferOffset += fmd.getBufferLength();
+      }
+    }
+
+        /* Set the selection vector for the record batch if the
+         * incoming batch had a selection vector
+         */
+    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+    {
+      if (sv2 == null)
+        sv2 = new SelectionVector2(allocator);
+
+      sv2.setRecordCount(svCount);
+
+            /* create our selection vector from the
+             * incoming selection vector's buffer
+             */
+      sv2.setBuffer(svBuf);
+
+      svBuf.release();
+    }
+
+    container.buildSchema(svMode);
+
+        /* Set the record count in the value vector */
+    for(VectorWrapper<?> v : container)
+    {
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(incomingRecordCount);
+    }
+    retainedVectorContainer = container;
+  }
+
   private void clear() {
   }
 
   public VectorAccessible get() {
     return va;
   }
+
+  public void retain(VectorContainer container) {
+    this.retain = true;
+    this.retainedVectorContainer = container;
+  }
+
+  public VectorContainer getRetainedVectorContainer() {
+    return retainedVectorContainer;
+  }
+
+  public SelectionVector2 getSv2() {
+    return sv2;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index d2828cd..36c390c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -21,6 +21,7 @@ package org.apache.drill.exec.physical.impl.trace;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.Formatter;
 import com.google.common.collect.Iterators;
 import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -45,6 +47,10 @@ import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 
 import io.netty.buffer.ByteBuf;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /* TraceRecordBatch contains value vectors which are exactly the same
  * as the incoming record batch's value vectors. If the incoming
@@ -72,8 +78,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
     private final String logLocation;
 
     /* File descriptors needed to be able to dump to log file */
-    private FileOutputStream fos;
-    private FileChannel fc;
+    private OutputStream fos;
 
     public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
     {
@@ -86,14 +91,12 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
         /* Create the log file we will dump to and initialize the file descriptors */
         try
         {
-            File file = new File(fileName);
+          Configuration conf = new Configuration();
+          conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
+          FileSystem fs = FileSystem.get(conf);
 
             /* create the file */
-            file.createNewFile();
-
-            fos = new FileOutputStream(file, true);
-            fc  = fos.getChannel();
-
+          fos = fs.create(new Path(fileName));
         } catch (IOException e)
         {
             logger.error("Unable to create file: " + fileName);
@@ -124,35 +127,17 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
     @Override
     protected void doWork()
     {
-        /* get the selection vector mode */
-        SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
-
-        /* Get the array of buffers from the incoming record batch */
-        WritableBatch batch = incoming.getWritableBatch();
-
-        ByteBuf[] incomingBuffers = batch.getBuffers();
-        RecordBatchDef batchDef = batch.getDef();
-
-        /* ByteBuf associated with the selection vector */
-        ByteBuf svBuf = null;
-
-        /* Size of the selection vector */
-        int svCount = 0;
-
-        if (svMode == SelectionVectorMode.TWO_BYTE)
-        {
-            SelectionVector2 sv2 = incoming.getSelectionVector2();
-            svCount = sv2.getCount();
-            svBuf = sv2.getBuffer();
-        }
-
-        /* Write the ByteBuf for the value vectors and selection vectors to disk
-         * totalBufferLength is the sum of size of all the ByteBuf across all value vectors
-         */
-        int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);
-
-        /* Reconstruct the record batch from the ByteBuf's */
-        reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(incoming,
+            incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? incoming.getSelectionVector2() : null,
+            context.getAllocator());
+      wrap.retain(container);
+
+      try {
+        wrap.writeToStream(fos);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      sv = wrap.getSv2();
     }
 
     @Override
@@ -198,114 +183,6 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
         return fileName;
     }
 
-    private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
-    {
-        String fileName = getFileName();
-        int totalBufferLength = 0;
-
-        try
-        {
-            /* Write the metadata to the file */
-            batchDef.writeDelimitedTo(fos);
-
-            /* If we have a selection vector, dump it to file first */
-            if (svBuf != null)
-            {
-
-                /* For writing to the selection vectors we use
-                 * setChar() method which does not modify the
-                 * reader and writer index. To copy the entire buffer
-                 * without having to get each byte individually we need
-                 * to set the writer index
-                 */
-                svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-                fc.write(svBuf.nioBuffers());
-            }
-
-            /* Dump the array of ByteBuf's associated with the value vectors */
-            for (ByteBuf buf : vvBufs)
-            {
-                /* dump the buffer into the file channel */
-                fc.write(buf.nioBuffers());
-
-                /* compute total length of buffer, will be used when
-                 * we create a compound buffer
-                 */
-                totalBufferLength += buf.readableBytes();
-            }
-
-            fc.force(true);
-            fos.flush();
-
-        } catch (IOException e)
-        {
-            logger.error("Unable to write buffer to file: " + fileName);
-        }
-
-        return totalBufferLength;
-    }
-
-    private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
-                                        ByteBuf[] vvBufs, int totalBufferLength,
-                                        ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
-    {
-        if (vvBufs.length > 0)    /* If we have ByteBuf's associated with value vectors */
-        {
-            CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
-
-            /* Copy data from each buffer into the compound buffer */
-            for (int i = 0; i < vvBufs.length; i++)
-            {
-                cbb.addComponent(vvBufs[i]);
-            }
-
-            List<FieldMetadata> fields = batchDef.getFieldList();
-
-            int bufferOffset = 0;
-
-            /* For each value vector slice up the appropriate size from
-             * the compound buffer and load it into the value vector
-             */
-            int vectorIndex = 0;
-
-            for(VectorWrapper<?> vv : container)
-            {
-                FieldMetadata fmd = fields.get(vectorIndex);
-                ValueVector v = vv.getValueVector();
-                v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
-                vectorIndex++;
-                bufferOffset += fmd.getBufferLength();
-            }
-        }
-
-        /* Set the selection vector for the record batch if the
-         * incoming batch had a selection vector
-         */
-        if (svMode == SelectionVectorMode.TWO_BYTE)
-        {
-            if (sv == null)
-                sv = new SelectionVector2(context.getAllocator());
-
-            sv.setRecordCount(svCount);
-
-            /* create our selection vector from the
-             * incoming selection vector's buffer
-             */
-            sv.setBuffer(svBuf);
-
-            svBuf.release();
-        }
-
-        container.buildSchema(svMode);
-
-        /* Set the record count in the value vector */
-        for(VectorWrapper<?> v : container)
-        {
-            ValueVector.Mutator m = v.getValueVector().getMutator();
-            m.setValueCount(incoming.getRecordCount());
-        }
-    }
 
     @Override
     protected void cleanup()
@@ -318,7 +195,6 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
         try
         {
             fos.close();
-            fc.close();
         } catch (IOException e)
         {
             logger.error("Unable to close file descriptors for file: " + getFileName());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index acc6c9d..1639940 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -47,6 +47,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
     case NOT_YET:
     case STOP:
       container.zeroVectors();
+      cleanup();
       return upstream;
     case OK_NEW_SCHEMA:
       try{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index ca78eb5..725c6b4 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -68,6 +68,7 @@ drill.exec: {
     executor.threads: 4
   }
   trace: {
-    directory: "/var/log/drill"
+    directory: "/var/log/drill",
+    filesystem: "file:///"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0ac0b19b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index 92faf8d..4c04abc 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -19,13 +19,13 @@ package org.apache.drill.exec.physical.impl.trace;
 
 import static org.junit.Assert.*;
 
-import io.netty.buffer.ByteBufInputStream;
 import mockit.Injectable;
 import mockit.NonStrictExpectations;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -37,10 +37,14 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -48,15 +52,6 @@ import com.google.common.base.Charsets;
 import com.google.common.io.Files;
 import com.yammer.metrics.MetricRegistry;
 
-import java.nio.channels.FileChannel;
-import java.nio.ByteBuffer;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.FileInputStream;
-
-import io.netty.buffer.ByteBuf;
-
 /*
  * This test uses a simple physical plan with a mock-scan that
  * generates one row. The physical plan also consists of the
@@ -112,44 +107,30 @@ public class TestTraceOutputDump {
 
         System.out.println("Found log location: " + logLocation);
 
-        String filename = new String(logLocation + "/" + "mock-scan" + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
-
-        System.out.println("File Name: " + filename);
+      String filename = String.format("%s//%s_%d_%d_mock-scan", logLocation, qid, majorFragmentId, minorFragmentId);
 
-        File file = new File(filename);
+      System.out.println("File Name: " + filename);
 
-        if (!file.exists())
-            throw new IOException("Trace file not created");
-
-        FileInputStream input = new FileInputStream(file.getAbsoluteFile());
-        FileChannel fc = input.getChannel();
-        int size = (int) fc.size();
-        BufferAllocator allocator = context.getAllocator();
-        ByteBuffer buffer = ByteBuffer.allocate((int) fc.size());
-        ByteBuf buf = allocator.buffer(size);
-
-        int readSize;
-
-        /* Read the file into a ByteBuffer and transfer it into our ByteBuf */
-        while ((readSize = (fc.read(buffer))) > 0)
-        {
-            buffer.position(0).limit(readSize);
-            buf.writeBytes(buffer);
-            buffer.clear();
-        }
+        Configuration conf = new Configuration();
+      conf.set("fs.name.default", c.getString(ExecConstants.TRACE_DUMP_FILESYSTEM));
 
-        final ByteBufInputStream is = new ByteBufInputStream(buf, buf.readableBytes());
+        FileSystem fs = FileSystem.get(conf);
+      Path path = new Path(filename);
+      assertTrue("Trace file does not exist", fs.exists(path));
+      FSDataInputStream in = fs.open(path);
 
-        RecordBatchDef batchDef = RecordBatchDef.parseDelimitedFrom(is);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(context.getAllocator());
+      wrap.readFromStream(in);
+      VectorAccessible container = wrap.get();
 
         /* Assert there are no selection vectors */
-        assertTrue(!batchDef.getIsSelectionVector2());
+      assertTrue(wrap.getSv2() == null);
 
         /* Assert there is only one record */
-        assertTrue(batchDef.getRecordCount() == 1);
+        assertTrue(container.getRecordCount() == 1);
 
         /* Read the Integer value and ASSERT its Integer.MIN_VALUE */
-        int value = buf.getInt(buf.readerIndex());
+        int value = (int) container.iterator().next().getValueVector().getAccessor().getObject(0);
         assertTrue(value == Integer.MIN_VALUE);
     }
 


[03/10] git commit: DRILL-271 retool VectorContainerSerializable to work with containers and batches. also modify DrillSerializable to work with InputStream, OutputStream

Posted by ja...@apache.org.
DRILL-271 retool VectorContainerSerializable to work with containers and batches. also modify DrillSerializable to work with InputStream, OutputStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/266d2483
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/266d2483
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/266d2483

Branch: refs/heads/master
Commit: 266d24836d079b5d4ddba013f04a79716da24d86
Parents: 30ada5d
Author: Steven Phillips <sp...@maprtech.com>
Authored: Mon Oct 28 17:34:05 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:31 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/cache/DrillSerializable.java     |  10 +-
 .../exec/cache/VectorContainerSerializable.java | 156 ++++++++++++++-----
 .../drill/exec/client/QuerySubmitter.java       |   2 +-
 .../OrderedPartitionRecordBatch.java            |   2 +
 .../drill/exec/cache/TestVectorCache.java       |   4 +-
 5 files changed, 128 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
index 534d781..875e8b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/DrillSerializable.java
@@ -17,15 +17,15 @@
  */
 package org.apache.drill.exec.cache;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
+import java.io.*;
 
 /**
  * Classes that can be put in the Distributed Cache must implement this interface.
  */
 public interface DrillSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSerializable.class);
-  public void read(DataInput arg0) throws IOException;
-  public void write (DataOutput arg0) throws IOException;
+  public void read(DataInput input) throws IOException;
+  public void readFromStream(InputStream input) throws IOException;
+  public void write(DataOutput output) throws IOException;
+  public void writeToStream(OutputStream output) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
index 1e6eeac..5813dd6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorContainerSerializable.java
@@ -18,15 +18,18 @@
 package org.apache.drill.exec.cache;
 
 import com.google.common.collect.Lists;
+import com.yammer.metrics.MetricRegistry;
+import com.yammer.metrics.Timer;
 import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.*;
+import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -37,72 +40,147 @@ import java.util.List;
 
 public class VectorContainerSerializable implements DrillSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerSerializable.class);
+  static final MetricRegistry metrics = DrillMetrics.getInstance();
+  static final String WRITER_TIMER = MetricRegistry.name(VectorContainerSerializable.class, "writerTime");
 
-//  List<ValueVector> vectorList;
-  private VectorContainer container;
+  private VectorAccessible va;
   private BootStrapContext context;
-  private int listSize = 0;
   private int recordCount = -1;
+  private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
+  private SelectionVector2 sv2;
 
   /**
    *
-   * @param container
+   * @param va
    */
-  public VectorContainerSerializable(VectorContainer container){
-    this.container = container;
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-    this.context = new BootStrapContext(DrillConfig.create());
-    this.listSize = container.getNumberOfColumns();
+  public VectorContainerSerializable(VectorAccessible va){
+    this.va = va;
+    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
+  }
+
+  public VectorContainerSerializable(VectorAccessible va, SelectionVector2 sv2, FragmentContext context) {
+    this.va = va;
+    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
+    this.sv2 = sv2;
+    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
   }
 
   public VectorContainerSerializable() {
-    this.container = new VectorContainer();
-    this.context = new BootStrapContext(DrillConfig.create());
+    this.va = new VectorContainer();
+    this.context = new BootStrapContext(DrillConfig.getDefaultInstance());
   }
-  
+
   @Override
   public void read(DataInput input) throws IOException {
+    readFromStream(DataInputInputStream.constructInputStream(input));
+  }
+  
+  @Override
+  public void readFromStream(InputStream input) throws IOException {
+    VectorContainer container = new VectorContainer();
+    UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+    recordCount = batchDef.getRecordCount();
+    if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
+      sv2.allocateNew(recordCount * 2);
+      sv2.getBuffer().setBytes(0, input, recordCount * 2);
+      svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
     List<ValueVector> vectorList = Lists.newArrayList();
-    int size = input.readInt();
-    InputStream stream = DataInputInputStream.constructInputStream(input);
-    for (int i = 0; i < size; i++) {
-      FieldMetadata metaData = FieldMetadata.parseDelimitedFrom(stream);
-      if (recordCount == -1) recordCount = metaData.getValueCount();
+    List<FieldMetadata> fieldList = batchDef.getFieldList();
+    for (FieldMetadata metaData : fieldList) {
       int dataLength = metaData.getBufferLength();
       byte[] bytes = new byte[dataLength];
-      input.readFully(bytes);
+      input.read(bytes);
       MaterializedField field = MaterializedField.create(metaData.getDef());
       ByteBuf buf = context.getAllocator().buffer(dataLength);
       buf.setBytes(0, bytes);
       ValueVector vector = TypeHelper.getNewVector(field, context.getAllocator());
       vector.load(metaData, buf);
-      vectorList.add((BaseDataValueVector) vector);
+      vectorList.add(vector);
     }
     container.addCollection(vectorList);
-    container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+    container.buildSchema(svMode);
     container.setRecordCount(recordCount);
-    this.listSize = vectorList.size();
+    va = container;
   }
 
   @Override
   public void write(DataOutput output) throws IOException {
-//    int size = vectorList.size();
-    output.writeInt(listSize);
-    for (VectorWrapper w : container) {
-      OutputStream stream = DataOutputOutputStream.constructOutputStream(output);
-      ValueVector vector = w.getValueVector();
-      if (recordCount == -1) container.setRecordCount(vector.getMetadata().getValueCount());
-      vector.getMetadata().writeDelimitedTo(stream);
-      ByteBuf[] data = vector.getBuffers();
-      for (ByteBuf bb : data) {
-        byte[] bytes = new byte[bb.readableBytes()];
-        bb.getBytes(0, bytes);
-        stream.write(bytes);
+    writeToStream(DataOutputOutputStream.constructOutputStream(output));
+  }
+
+  @Override
+  public void writeToStream(OutputStream output) throws IOException {
+    final Timer.Context context = metrics.timer(WRITER_TIMER).time();
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
+
+    ByteBuf[] incomingBuffers = batch.getBuffers();
+    UserBitShared.RecordBatchDef batchDef = batch.getDef();
+
+        /* ByteBuf associated with the selection vector */
+    ByteBuf svBuf = null;
+
+        /* Size of the selection vector */
+    int svCount = 0;
+
+    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
+    {
+      svCount = sv2.getCount();
+      svBuf = sv2.getBuffer();
+    }
+
+    int totalBufferLength = 0;
+
+    try
+    {
+            /* Write the metadata to the file */
+      batchDef.writeDelimitedTo(output);
+
+            /* If we have a selection vector, dump it to file first */
+      if (svBuf != null)
+      {
+
+                /* For writing to the selection vectors we use
+                 * setChar() method which does not modify the
+                 * reader and writer index. To copy the entire buffer
+                 * without having to get each byte individually we need
+                 * to set the writer index
+                 */
+        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
+
+//        fc.write(svBuf.nioBuffers());
+        svBuf.getBytes(0, output, svBuf.readableBytes());
+        svBuf.release();
       }
+
+            /* Dump the array of ByteBuf's associated with the value vectors */
+      for (ByteBuf buf : incomingBuffers)
+      {
+                /* dump the buffer into the file channel */
+        int bufLength = buf.readableBytes();
+        buf.getBytes(0, output, bufLength);
+
+                /* compute total length of buffer, will be used when
+                 * we create a compound buffer
+                 */
+        totalBufferLength += buf.readableBytes();
+        buf.release();
+      }
+
+      output.flush();
+      context.stop();
+    } catch (IOException e)
+    {
+      throw new RuntimeException(e);
+    } finally {
+      clear();
     }
   }
 
-  public VectorContainer get() {
-    return container;
+  private void clear() {
+  }
+
+  public VectorAccessible get() {
+    return va;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
index 160ef7f..c34ab1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java
@@ -149,8 +149,8 @@ public class QuerySubmitter {
     @Override
     public void resultArrived(QueryResultBatch result) {
       int rows = result.getHeader().getRowCount();
-      count.addAndGet(rows);
       if (result.getData() != null) {
+        count.addAndGet(rows);
         try {
           loader.load(result.getHeader().getDef(), result.getData());
         } catch (SchemaChangeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 7dc7d55..ef3886c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -147,6 +147,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       for (VectorWrapper vw : containerToCache) {
         vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
       }
+      containerToCache.setRecordCount(copier.getOutputRecords());
 
       // Get a distributed multimap handle from the distributed cache, and put the vectors from the new vector container
       // into a serializable wrapper object, and then add to distributed map
@@ -236,6 +237,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
     for (VectorWrapper vw : candidatePartitionTable) {
       vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
     }
+    candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
 
     VectorContainerSerializable wrap = new VectorContainerSerializable(candidatePartitionTable);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/266d2483/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index 39ec720..ffc0274 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.server.Drillbit;
@@ -66,13 +67,14 @@ public class TestVectorCache {
 
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
+    container.setRecordCount(4);
     VectorContainerSerializable wrap = new VectorContainerSerializable(container);
 
     DistributedMultiMap<VectorContainerSerializable> mmap = cache.getMultiMap(VectorContainerSerializable.class);
     mmap.put("vectors", wrap);
     VectorContainerSerializable newWrap = (VectorContainerSerializable)mmap.get("vectors").iterator().next();
 
-    VectorContainer newContainer = newWrap.get();
+    VectorAccessible newContainer = newWrap.get();
     for (VectorWrapper w : newContainer) {
       ValueVector vv = w.getValueVector();
       int values = vv.getAccessor().getValueCount();


[02/10] git commit: Addressed review comments from Jacques

Posted by ja...@apache.org.
Addressed review comments from Jacques


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/30ada5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/30ada5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/30ada5de

Branch: refs/heads/master
Commit: 30ada5decc9ab92fb95d075f2d01ab1387ce8c22
Parents: 6c78890
Author: Mehant Baid <me...@github.com>
Authored: Wed Oct 23 23:19:36 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Wed Oct 30 19:02:19 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/TraceInjector.java |   2 +-
 .../physical/impl/trace/TraceRecordBatch.java   | 281 ++++++++++++-------
 .../exec/record/AbstractSingleRecordBatch.java  |   2 +-
 .../apache/drill/exec/proto/UserBitShared.java  |  42 +--
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +-
 5 files changed, 199 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 3e82a73..9c859a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -74,7 +74,7 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
         /* For every child operator create a trace operator as its parent */
         for (int i = 0; i < newChildren.size(); i++)
         {
-            String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
+            String traceTag = newChildren.get(i).toString() + Integer.toString(traceTagCount++);
 
             /* Trace operator */
             Trace traceOp = new Trace(newChildren.get(i), traceTag);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 97131cd..d2828cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -24,7 +24,10 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Formatter;
 
+import com.google.common.collect.Iterators;
+import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -43,6 +46,19 @@ import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 
 import io.netty.buffer.ByteBuf;
 
+/* TraceRecordBatch contains value vectors which are exactly the same
+ * as the incoming record batch's value vectors. If the incoming
+ * record batch has a selection vector (type 2) then TraceRecordBatch
+ * will also contain a selection vector.
+ *
+ * Purpose of this record batch is to dump the data associated with all
+ * the value vectors and selection vector to disk.
+ *
+ * This record batch does not modify any data or schema, it simply
+ * consumes the incoming record batch's data, dump to disk and pass the
+ * same set of value vectors (and selection vectors) to its parent record
+ * batch
+ */
 public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
 {
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
@@ -55,11 +71,33 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
     /* Location where the log should be dumped */
     private final String logLocation;
 
+    /* File descriptors needed to be able to dump to log file */
+    private FileOutputStream fos;
+    private FileChannel fc;
+
     public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
     {
         super(pop, context, incoming);
         this.traceTag = pop.traceTag;
         logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+        String fileName = getFileName();
+
+        /* Create the log file we will dump to and initialize the file descriptors */
+        try
+        {
+            File file = new File(fileName);
+
+            /* create the file */
+            file.createNewFile();
+
+            fos = new FileOutputStream(file, true);
+            fc  = fos.getChannel();
+
+        } catch (IOException e)
+        {
+            logger.error("Unable to create file: " + fileName);
+        }
     }
 
     @Override
@@ -75,6 +113,13 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
      * Function is invoked for every record batch and it simply
      * dumps the buffers associated with all the value vectors in
      * this record batch to a log file.
+     *
+     * Function is divided into three main parts
+     *   1. Get all the buffers(ByteBuf's) associated with incoming
+     *      record batch's value vectors and selection vector
+     *   2. Dump these buffers to the log file (performed by writeToFile())
+     *   3. Construct the record batch with these buffers to look exactly like
+     *      the incoming record batch (performed by reconstructRecordBatch())
      */
     @Override
     protected void doWork()
@@ -85,37 +130,87 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
         /* Get the array of buffers from the incoming record batch */
         WritableBatch batch = incoming.getWritableBatch();
 
-        BufferAllocator allocator = context.getAllocator();
         ByteBuf[] incomingBuffers = batch.getBuffers();
         RecordBatchDef batchDef = batch.getDef();
 
-        /* Total length of buffers across all value vectors */
-        int totalBufferLength = 0;
+        /* ByteBuf associated with the selection vector */
+        ByteBuf svBuf = null;
 
-        String fileName = getFileName();
+        /* Size of the selection vector */
+        int svCount = 0;
 
-        try
+        if (svMode == SelectionVectorMode.TWO_BYTE)
         {
-            File file = new File(fileName);
+            SelectionVector2 sv2 = incoming.getSelectionVector2();
+            svCount = sv2.getCount();
+            svBuf = sv2.getBuffer();
+        }
 
-            if (!file.exists())
-                file.createNewFile();
+        /* Write the ByteBuf for the value vectors and selection vectors to disk
+         * totalBufferLength is the sum of size of all the ByteBuf across all value vectors
+         */
+        int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);
 
-            FileOutputStream fos = new FileOutputStream(file, true);
+        /* Reconstruct the record batch from the ByteBuf's */
+        reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+    }
+
+    @Override
+    protected void setupNewSchema() throws SchemaChangeException
+    {
+        /* Trace operator does not deal with hyper vectors yet */
+        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+        /* we have a new schema, clear our existing container to
+         * load the new value vectors
+         */
+        container.clear();
 
+        /* Add all the value vectors in the container */
+        for(VectorWrapper<?> vv : incoming)
+        {
+            TransferPair tp = vv.getValueVector().getTransferPair();
+            container.add(tp.getTo());
+        }
+    }
+
+    @Override
+    public SelectionVector2 getSelectionVector2() {
+        return sv;
+    }
+
+    private String getFileName()
+    {
+        /* From the context, get the query id, major fragment id,
+         * minor fragment id. This will be used as the file name
+         * to which we will dump the incoming buffer data
+         */
+        FragmentHandle handle = incoming.getContext().getHandle();
+
+        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+        int majorFragmentId = handle.getMajorFragmentId();
+        int minorFragmentId = handle.getMinorFragmentId();
+
+        String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+
+        return fileName;
+    }
+
+    private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
+    {
+        String fileName = getFileName();
+        int totalBufferLength = 0;
+
+        try
+        {
             /* Write the metadata to the file */
             batchDef.writeDelimitedTo(fos);
 
-            FileChannel fc = fos.getChannel();
-
             /* If we have a selection vector, dump it to file first */
-            if (svMode == SelectionVectorMode.TWO_BYTE)
+            if (svBuf != null)
             {
-                SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
-                int recordCount = incomingSV2.getCount();
-                int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;
-
-                ByteBuf buf = incomingSV2.getBuffer();
 
                 /* For writing to the selection vectors we use
                  * setChar() method which does not modify the
@@ -123,32 +218,16 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
                  * without having to get each byte individually we need
                  * to set the writer index
                  */
-                buf.writerIndex(sv2Size);
-
-                /* dump the selection vector to log */
-                dumpByteBuf(fc, buf);
-
-                if (sv == null)
-                    sv = new SelectionVector2(allocator);
+                svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
 
-                sv.setRecordCount(recordCount);
-
-                /* create our selection vector from the
-                 * incoming selection vector's buffer
-                 */
-                sv.setBuffer(buf);
-
-                buf.release();
+                fc.write(svBuf.nioBuffers());
             }
 
-            /* For each buffer dump it to log and compute total length */
-            for (ByteBuf buf : incomingBuffers)
+            /* Dump the array of ByteBuf's associated with the value vectors */
+            for (ByteBuf buf : vvBufs)
             {
                 /* dump the buffer into the file channel */
-                dumpByteBuf(fc, buf);
-
-                /* Reset reader index on the ByteBuf so we can read it again */
-                buf.resetReaderIndex();
+                fc.write(buf.nioBuffers());
 
                 /* compute total length of buffer, will be used when
                  * we create a compound buffer
@@ -156,41 +235,66 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
                 totalBufferLength += buf.readableBytes();
             }
 
-            fc.close();
-            fos.close();
+            fc.force(true);
+            fos.flush();
 
         } catch (IOException e)
         {
             logger.error("Unable to write buffer to file: " + fileName);
         }
 
-        /* allocate memory for the compound buffer, compound buffer
-         * will contain the data from all the buffers across all the
-         * value vectors
-         */
-        ByteBuf byteBuf = allocator.buffer(totalBufferLength);
+        return totalBufferLength;
+    }
 
-        /* Copy data from each buffer into the compound buffer */
-        for (int i = 0; i < incomingBuffers.length; i++)
+    private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
+                                        ByteBuf[] vvBufs, int totalBufferLength,
+                                        ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
+    {
+        if (vvBufs.length > 0)    /* If we have ByteBuf's associated with value vectors */
         {
-            byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
-        }
+            CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
+
+            /* Copy data from each buffer into the compound buffer */
+            for (int i = 0; i < vvBufs.length; i++)
+            {
+                cbb.addComponent(vvBufs[i]);
+            }
 
-        List<FieldMetadata> fields = batchDef.getFieldList();
+            List<FieldMetadata> fields = batchDef.getFieldList();
 
-        int bufferOffset = 0;
+            int bufferOffset = 0;
 
-        /* For each value vector slice up the appropriate size from
-         * the compound buffer and load it into the value vector
+            /* For each value vector slice up the appropriate size from
+             * the compound buffer and load it into the value vector
+             */
+            int vectorIndex = 0;
+
+            for(VectorWrapper<?> vv : container)
+            {
+                FieldMetadata fmd = fields.get(vectorIndex);
+                ValueVector v = vv.getValueVector();
+                v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+                vectorIndex++;
+                bufferOffset += fmd.getBufferLength();
+            }
+        }
+
+        /* Set the selection vector for the record batch if the
+         * incoming batch had a selection vector
          */
-        int vectorIndex = 0;
-        for(VectorWrapper<?> vv : container)
+        if (svMode == SelectionVectorMode.TWO_BYTE)
         {
-            FieldMetadata fmd = fields.get(vectorIndex);
-            ValueVector v = vv.getValueVector();
-            v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
-            vectorIndex++;
-            bufferOffset += fmd.getBufferLength();
+            if (sv == null)
+                sv = new SelectionVector2(context.getAllocator());
+
+            sv.setRecordCount(svCount);
+
+            /* create our selection vector from the
+             * incoming selection vector's buffer
+             */
+            sv.setBuffer(svBuf);
+
+            svBuf.release();
         }
 
         container.buildSchema(svMode);
@@ -204,56 +308,21 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
     }
 
     @Override
-    protected void setupNewSchema() throws SchemaChangeException
+    protected void cleanup()
     {
-        /* Trace operator does not deal with hyper vectors yet */
-        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
-            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
-        /* we have a new schema, clear our existing container to
-         * load the new value vectors
-         */
-        container.clear();
+        /* Release the selection vector */
+        if (sv != null)
+          sv.clear();
 
-        /* Add all the value vectors in the container */
-        for(VectorWrapper<?> vv : incoming)
+        /* Close the file descriptors */
+        try
         {
-            TransferPair tp = vv.getValueVector().getTransferPair();
-            container.add(tp.getTo());
+            fos.close();
+            fc.close();
+        } catch (IOException e)
+        {
+            logger.error("Unable to close file descriptors for file: " + getFileName());
         }
     }
 
-    @Override
-    public SelectionVector2 getSelectionVector2() {
-        return sv;
-    }
-
-    private String getFileName()
-    {
-        /* From the context, get the query id, major fragment id,
-         * minor fragment id. This will be used as the file name
-         * to which we will dump the incoming buffer data
-         */
-        FragmentHandle handle = incoming.getContext().getHandle();
-
-        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
-
-        int majorFragmentId = handle.getMajorFragmentId();
-        int minorFragmentId = handle.getMinorFragmentId();
-
-        return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
-    }
-
-    private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
-    {
-        int bufferLength = buf.readableBytes();
-
-        byte[] byteArray = new byte[bufferLength];
-
-        /* Transfer bytes to a byte array */
-        buf.readBytes(byteArray);
-
-        /* Drop the byte array into the file channel */
-        fc.write(ByteBuffer.wrap(byteArray));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 25284b6..acc6c9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
       throw new UnsupportedOperationException();
     }
   }
-  
+
   protected abstract void setupNewSchema() throws SchemaChangeException;
   protected abstract void doWork();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index f305c00..60dd6fd 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2910,13 +2910,13 @@ public final class UserBitShared {
      */
     int getRecordCount();
 
-    // optional bool isSelectionVector2 = 3;
+    // optional bool is_selection_vector_2 = 3;
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     boolean hasIsSelectionVector2();
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     boolean getIsSelectionVector2();
   }
@@ -3084,17 +3084,17 @@ public final class UserBitShared {
       return recordCount_;
     }
 
-    // optional bool isSelectionVector2 = 3;
-    public static final int ISSELECTIONVECTOR2_FIELD_NUMBER = 3;
+    // optional bool is_selection_vector_2 = 3;
+    public static final int IS_SELECTION_VECTOR_2_FIELD_NUMBER = 3;
     private boolean isSelectionVector2_;
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     public boolean hasIsSelectionVector2() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     public boolean getIsSelectionVector2() {
       return isSelectionVector2_;
@@ -3667,22 +3667,22 @@ public final class UserBitShared {
         return this;
       }
 
-      // optional bool isSelectionVector2 = 3;
+      // optional bool is_selection_vector_2 = 3;
       private boolean isSelectionVector2_ ;
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public boolean hasIsSelectionVector2() {
         return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public boolean getIsSelectionVector2() {
         return isSelectionVector2_;
       }
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public Builder setIsSelectionVector2(boolean value) {
         bitField0_ |= 0x00000004;
@@ -3691,7 +3691,7 @@ public final class UserBitShared {
         return this;
       }
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public Builder clearIsSelectionVector2() {
         bitField0_ = (bitField0_ & ~0x00000004);
@@ -4984,16 +4984,16 @@ public final class UserBitShared {
       "ror\030\005 \003(\0132\031.exec.shared.ParsingError\"\\\n\014" +
       "ParsingError\022\024\n\014start_column\030\002 \001(\005\022\021\n\tst" +
       "art_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(\005\022\017\n\007end" +
-      "_row\030\005 \001(\005\"\r\n\013RecordBatch\"m\n\016RecordBatch",
+      "_row\030\005 \001(\005\"\r\n\013RecordBatch\"p\n\016RecordBatch",
       "Def\022)\n\005field\030\001 \003(\0132\032.exec.shared.FieldMe" +
-      "tadata\022\024\n\014record_count\030\002 \001(\005\022\032\n\022isSelect" +
-      "ionVector2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n\003de" +
-      "f\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_count\030\002" +
-      " \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013group_c" +
-      "ount\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005chi" +
-      "ld\030\006 \003(\0132\032.exec.shared.FieldMetadataB.\n\033" +
-      "org.apache.drill.exec.protoB\rUserBitShar" +
-      "edH\001"
+      "tadata\022\024\n\014record_count\030\002 \001(\005\022\035\n\025is_selec" +
+      "tion_vector_2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n" +
+      "\003def\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_coun" +
+      "t\030\002 \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013grou" +
+      "p_count\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005" +
+      "child\030\006 \003(\0132\032.exec.shared.FieldMetadataB" +
+      ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
+      "haredH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 5bea284..0d98797 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -34,7 +34,7 @@ message RecordBatch{
 message RecordBatchDef {
 	repeated FieldMetadata field = 1;
 	optional int32 record_count = 2;
-    optional bool isSelectionVector2 = 3;
+    optional bool is_selection_vector_2 = 3;
 
 }
 


[05/10] git commit: DRILL-271 release buffer in VectorAccessibleSerializable read. Also read directly from stream.

Posted by ja...@apache.org.
DRILL-271 release buffer in VectorAccessibleSerializable read. Also read directly from stream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/90c302de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/90c302de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/90c302de

Branch: refs/heads/master
Commit: 90c302def50ca74e7c4c2b2c40a75a8250f4df00
Parents: d529352
Author: Steven Phillips <sp...@maprtech.com>
Authored: Wed Oct 30 11:25:10 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:42 2013 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/cache/VectorAccessibleSerializable.java   | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c302de/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 24387d8..f4a6998 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -87,13 +87,12 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     List<FieldMetadata> fieldList = batchDef.getFieldList();
     for (FieldMetadata metaData : fieldList) {
       int dataLength = metaData.getBufferLength();
-      byte[] bytes = new byte[dataLength];
-      input.read(bytes);
       MaterializedField field = MaterializedField.create(metaData.getDef());
       ByteBuf buf = allocator.buffer(dataLength);
-      buf.setBytes(0, bytes);
+      buf.writeBytes(input, dataLength);
       ValueVector vector = TypeHelper.getNewVector(field, allocator);
       vector.load(metaData, buf);
+      buf.release();
       vectorList.add(vector);
     }
     container.addCollection(vectorList);


[07/10] git commit: DRILL-271 Address code review comments. VectorAccessibleSerializable now take WritableBatch. Release and reconstruct code now part of Writablebatch class.

Posted by ja...@apache.org.
DRILL-271 Address code review comments. VectorAccessibleSerializable now take WritableBatch. Release and reconstruct code now part of Writablebatch class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/b44b6c71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/b44b6c71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/b44b6c71

Branch: refs/heads/master
Commit: b44b6c719bb4cf7159ef0cb0c6ec172b3273f681
Parents: 0ac0b19
Author: Steven Phillips <sp...@maprtech.com>
Authored: Thu Oct 31 17:29:26 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Thu Oct 31 17:34:56 2013 -0700

----------------------------------------------------------------------
 .../cache/VectorAccessibleSerializable.java     | 181 ++++++-------------
 .../OrderedPartitionRecordBatch.java            |   6 +-
 .../physical/impl/trace/TraceRecordBatch.java   |  25 ++-
 .../apache/drill/exec/record/WritableBatch.java |  58 ++++++
 .../drill/exec/cache/TestVectorCache.java       |   8 +-
 .../drill/exec/cache/TestWriteToDisk.java       |   8 +-
 6 files changed, 133 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 62f8097..e5bb94b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -22,75 +22,87 @@ import com.google.common.collect.Lists;
 import com.yammer.metrics.MetricRegistry;
 import com.yammer.metrics.Timer;
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.common.util.DataInputInputStream;
 import org.apache.drill.common.util.DataOutputOutputStream;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.metrics.DrillMetrics;
-import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 
 import java.io.*;
 import java.util.List;
 
+/**
+ * A wrapper around a VectorAccessible. Will serialize a VectorAccessible and write to an OutputStream, or can read
+ * from an InputStream and construct a new VectorContainer.
+ */
 public class VectorAccessibleSerializable implements DrillSerializable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorAccessibleSerializable.class);
   static final MetricRegistry metrics = DrillMetrics.getInstance();
   static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
   private VectorAccessible va;
+  private WritableBatch batch;
   private BufferAllocator allocator;
   private int recordCount = -1;
   private BatchSchema.SelectionVectorMode svMode = BatchSchema.SelectionVectorMode.NONE;
   private SelectionVector2 sv2;
-  private int incomingRecordCount;
-  private VectorContainer retainedVectorContainer;
-  private SelectionVector2 retainedSelectionVector;
 
   private boolean retain = false;
 
-  /**
-   *
-   * @param va
-   */
-  public VectorAccessibleSerializable(VectorAccessible va, BufferAllocator allocator){
-    this.va = va;
+  public VectorAccessibleSerializable(BufferAllocator allocator) {
     this.allocator = allocator;
-    incomingRecordCount = va.getRecordCount();
+    this.va = new VectorContainer();
   }
 
-  public VectorAccessibleSerializable(VectorAccessible va, SelectionVector2 sv2, BufferAllocator allocator) {
-    this.va = va;
-    this.allocator = allocator;
-    this.sv2 = sv2;
-    if (sv2 != null) this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
-    incomingRecordCount = va.getRecordCount();
+  public VectorAccessibleSerializable(WritableBatch batch, BufferAllocator allocator){
+    this(batch, null, allocator);
   }
 
-  public VectorAccessibleSerializable(BufferAllocator allocator) {
-    this.va = new VectorContainer();
+  /**
+   * Creates a wrapper around batch and sv2 for writing to a stream. sv2 will never be released by this class, and ownership
+   * is maintained by caller.
+   * @param batch
+   * @param sv2
+   * @param allocator
+   */
+  public VectorAccessibleSerializable(WritableBatch batch, SelectionVector2 sv2, BufferAllocator allocator) {
     this.allocator = allocator;
+    if (batch != null) {
+      this.batch = batch;
+    }
+    if (sv2 != null) {
+      this.sv2 = sv2;
+      this.svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+    }
   }
 
   @Override
   public void read(DataInput input) throws IOException {
     readFromStream(DataInputInputStream.constructInputStream(input));
   }
-  
+
+  /**
+   * Reads from an InputStream and parses a RecordBatchDef. From this, we construct a SelectionVector2 if it exits
+   * and construct the vectors and add them to a vector container
+   * @param input the InputStream to read from
+   * @throws IOException
+   */
   @Override
   public void readFromStream(InputStream input) throws IOException {
     VectorContainer container = new VectorContainer();
     UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
     recordCount = batchDef.getRecordCount();
     if (batchDef.hasIsSelectionVector2() && batchDef.getIsSelectionVector2()) {
-      sv2.allocateNew(recordCount * 2);
-      sv2.getBuffer().setBytes(0, input, recordCount * 2);
+      if (sv2 == null) {
+        sv2 = new SelectionVector2(allocator);
+      }
+      sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+      sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
       svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
     }
     List<ValueVector> vectorList = Lists.newArrayList();
@@ -116,20 +128,27 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     writeToStream(DataOutputOutputStream.constructOutputStream(output));
   }
 
+  public void writeToStreamAndRetain(OutputStream output) throws IOException {
+    retain = true;
+    writeToStream(output);
+  }
+
+  /**
+   * Serializes the VectorAccessible va and writes it to an output stream
+   * @param output the OutputStream to write to
+   * @throws IOException
+   */
   @Override
   public void writeToStream(OutputStream output) throws IOException {
     Preconditions.checkNotNull(output);
     final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
-    WritableBatch batch = WritableBatch.getBatchNoHVWrap(va.getRecordCount(),va,false);
 
     ByteBuf[] incomingBuffers = batch.getBuffers();
     UserBitShared.RecordBatchDef batchDef = batch.getDef();
 
         /* ByteBuf associated with the selection vector */
     ByteBuf svBuf = null;
-
-        /* Size of the selection vector */
-    int svCount = 0;
+    Integer svCount =  null;
 
     if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
     {
@@ -137,8 +156,6 @@ public class VectorAccessibleSerializable implements DrillSerializable {
       svBuf = sv2.getBuffer();
     }
 
-    int totalBufferLength = 0;
-
     try
     {
             /* Write the metadata to the file */
@@ -147,42 +164,20 @@ public class VectorAccessibleSerializable implements DrillSerializable {
             /* If we have a selection vector, dump it to file first */
       if (svBuf != null)
       {
-
-                /* For writing to the selection vectors we use
-                 * setChar() method which does not modify the
-                 * reader and writer index. To copy the entire buffer
-                 * without having to get each byte individually we need
-                 * to set the writer index
-                 */
-        svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
-
-//        fc.write(svBuf.nioBuffers());
         svBuf.getBytes(0, output, svBuf.readableBytes());
-        if (!retain) {
-          svBuf.release();
-        }
+        sv2.setBuffer(svBuf);
+        sv2.setRecordCount(svCount);
       }
 
             /* Dump the array of ByteBuf's associated with the value vectors */
       for (ByteBuf buf : incomingBuffers)
       {
-                /* dump the buffer into the file channel */
+                /* dump the buffer into the OutputStream */
         int bufLength = buf.readableBytes();
         buf.getBytes(0, output, bufLength);
-
-                /* compute total length of buffer, will be used when
-                 * we create a compound buffer
-                 */
-        totalBufferLength += buf.readableBytes();
-        if (!retain) {
-          buf.release();
-        }
       }
 
       output.flush();
-      if (retain) {
-        reconstructRecordBatch(batchDef, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
-      }
 
       timerContext.stop();
     } catch (IOException e)
@@ -193,86 +188,16 @@ public class VectorAccessibleSerializable implements DrillSerializable {
     }
   }
 
-  private void reconstructRecordBatch(UserBitShared.RecordBatchDef batchDef,
-                                      ByteBuf[] vvBufs, int totalBufferLength,
-                                      ByteBuf svBuf, int svCount, BatchSchema.SelectionVectorMode svMode)
-  {
-    VectorContainer container = retainedVectorContainer;
-    if (vvBufs.length > 0)    /* If we have ByteBuf's associated with value vectors */
-    {
-
-      CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
-
-            /* Copy data from each buffer into the compound buffer */
-      for (int i = 0; i < vvBufs.length; i++)
-      {
-        cbb.addComponent(vvBufs[i]);
-      }
-
-      List<FieldMetadata> fields = batchDef.getFieldList();
-
-      int bufferOffset = 0;
-
-            /* For each value vector slice up the appropriate size from
-             * the compound buffer and load it into the value vector
-             */
-      int vectorIndex = 0;
-
-      for(VectorWrapper<?> vv : container)
-      {
-        FieldMetadata fmd = fields.get(vectorIndex);
-        ValueVector v = vv.getValueVector();
-        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
-        vectorIndex++;
-        bufferOffset += fmd.getBufferLength();
-      }
-    }
-
-        /* Set the selection vector for the record batch if the
-         * incoming batch had a selection vector
-         */
-    if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
-    {
-      if (sv2 == null)
-        sv2 = new SelectionVector2(allocator);
-
-      sv2.setRecordCount(svCount);
-
-            /* create our selection vector from the
-             * incoming selection vector's buffer
-             */
-      sv2.setBuffer(svBuf);
-
-      svBuf.release();
-    }
-
-    container.buildSchema(svMode);
-
-        /* Set the record count in the value vector */
-    for(VectorWrapper<?> v : container)
-    {
-      ValueVector.Mutator m = v.getValueVector().getMutator();
-      m.setValueCount(incomingRecordCount);
-    }
-    retainedVectorContainer = container;
-  }
-
   private void clear() {
+    if (!retain) {
+      batch.clear();
+    }
   }
 
   public VectorAccessible get() {
     return va;
   }
 
-  public void retain(VectorContainer container) {
-    this.retain = true;
-    this.retainedVectorContainer = container;
-  }
-
-  public VectorContainer getRetainedVectorContainer() {
-    return retainedVectorContainer;
-  }
-
   public SelectionVector2 getSv2() {
     return sv2;
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index e39b82e..317e705 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -160,7 +160,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
         vectorList.add(vw.getValueVector());
       }
 
-      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(containerToCache, context.getDrillbitContext().getAllocator());
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
 
       mmap.put(mapKey, wrap);
       wrap = null;
@@ -238,8 +239,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
       vw.getValueVector().getMutator().setValueCount(copier2.getOutputRecords());
     }
     candidatePartitionTable.setRecordCount(copier2.getOutputRecords());
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
 
-    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(candidatePartitionTable, context.getDrillbitContext().getAllocator());
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getDrillbitContext().getAllocator());
 
     tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 36c390c..b73ddc1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -116,28 +116,27 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
      * Function is invoked for every record batch and it simply
      * dumps the buffers associated with all the value vectors in
      * this record batch to a log file.
-     *
-     * Function is divided into three main parts
-     *   1. Get all the buffers(ByteBuf's) associated with incoming
-     *      record batch's value vectors and selection vector
-     *   2. Dump these buffers to the log file (performed by writeToFile())
-     *   3. Construct the record batch with these buffers to look exactly like
-     *      the incoming record batch (performed by reconstructRecordBatch())
      */
     @Override
     protected void doWork()
     {
-      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(incoming,
-            incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE ? incoming.getSelectionVector2() : null,
-            context.getAllocator());
-      wrap.retain(container);
+
+      boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+      if (incomingHasSv2) {
+        sv = incoming.getSelectionVector2();
+      } else {
+        sv = null;
+      }
+      WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
+              incoming, incomingHasSv2 ? true : false);
+      VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
 
       try {
-        wrap.writeToStream(fos);
+        wrap.writeToStreamAndRetain(fos);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
-      sv = wrap.getSv2();
+      batch.reconstructContainer(container);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 76b79db..a33ca37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.List;
 
+import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -37,6 +38,7 @@ public class WritableBatch {
 
   private final RecordBatchDef def;
   private final ByteBuf[] buffers;
+  private boolean cleared = false;
 
   private WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) {
     logger.debug("Created new writable batch with def {} and buffers {}", def, buffers);
@@ -58,6 +60,62 @@ public class WritableBatch {
     return buffers;
   }
 
+  public void reconstructContainer(VectorContainer container)
+  {
+    Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+    if (buffers.length > 0)    /* If we have ByteBuf's associated with value vectors */
+    {
+
+      CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
+
+            /* Copy data from each buffer into the compound buffer */
+      for (ByteBuf buf : buffers)
+      {
+        cbb.addComponent(buf);
+      }
+
+      List<FieldMetadata> fields = def.getFieldList();
+
+      int bufferOffset = 0;
+
+            /* For each value vector slice up the appropriate size from
+             * the compound buffer and load it into the value vector
+             */
+      int vectorIndex = 0;
+
+      for(VectorWrapper<?> vv : container)
+      {
+        FieldMetadata fmd = fields.get(vectorIndex);
+        ValueVector v = vv.getValueVector();
+        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+        vectorIndex++;
+        bufferOffset += fmd.getBufferLength();
+      }
+    }
+
+    SelectionVectorMode svMode;
+    if (def.hasIsSelectionVector2() && def.getIsSelectionVector2()) {
+      svMode = SelectionVectorMode.TWO_BYTE;
+    } else {
+      svMode = SelectionVectorMode.NONE;
+    }
+    container.buildSchema(svMode);
+
+        /* Set the record count in the value vector */
+    for(VectorWrapper<?> v : container)
+    {
+      ValueVector.Mutator m = v.getValueVector().getMutator();
+      m.setValueCount(def.getRecordCount());
+    }
+  }
+
+  public void clear() {
+    for (ByteBuf buf : buffers) {
+      buf.release();
+    }
+    cleared = true;
+  }
+
   public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
     List<ValueVector> vectors = Lists.newArrayList();
     for(VectorWrapper<?> vw : vws){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
index 94aa3dd..cf274c1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestVectorCache.java
@@ -24,10 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -68,7 +65,8 @@ public class TestVectorCache {
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
     container.setRecordCount(4);
-    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
 
     DistributedMultiMap<VectorAccessibleSerializable> mmap = cache.getMultiMap(VectorAccessibleSerializable.class);
     mmap.put("vectors", wrap);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/b44b6c71/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
index 11d15d8..fb3e821 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestWriteToDisk.java
@@ -24,10 +24,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.*;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.RemoteServiceSet;
@@ -74,7 +71,8 @@ public class TestWriteToDisk {
     VectorContainer container = new VectorContainer();
     container.addCollection(vectorList);
     container.setRecordCount(4);
-    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(container, context.getAllocator());
+    WritableBatch batch = WritableBatch.getBatchNoHVWrap(container.getRecordCount(), container, false);
+    VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, context.getAllocator());
 
     Configuration conf = new Configuration();
     conf.set("fs.name.default", "file:///");