You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:19 UTC

[02/10] git commit: Addressed review comments from Jacques

Addressed review comments from Jacques


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/30ada5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/30ada5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/30ada5de

Branch: refs/heads/master
Commit: 30ada5decc9ab92fb95d075f2d01ab1387ce8c22
Parents: 6c78890
Author: Mehant Baid <me...@github.com>
Authored: Wed Oct 23 23:19:36 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Wed Oct 30 19:02:19 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/physical/impl/TraceInjector.java |   2 +-
 .../physical/impl/trace/TraceRecordBatch.java   | 281 ++++++++++++-------
 .../exec/record/AbstractSingleRecordBatch.java  |   2 +-
 .../apache/drill/exec/proto/UserBitShared.java  |  42 +--
 protocol/src/main/protobuf/UserBitShared.proto  |   2 +-
 5 files changed, 199 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 3e82a73..9c859a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -74,7 +74,7 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
         /* For every child operator create a trace operator as its parent */
         for (int i = 0; i < newChildren.size(); i++)
         {
-            String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
+            String traceTag = newChildren.get(i).toString() + Integer.toString(traceTagCount++);
 
             /* Trace operator */
             Trace traceOp = new Trace(newChildren.get(i), traceTag);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 97131cd..d2828cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -24,7 +24,10 @@ import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Formatter;
 
+import com.google.common.collect.Iterators;
+import io.netty.buffer.CompositeByteBuf;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -43,6 +46,19 @@ import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
 
 import io.netty.buffer.ByteBuf;
 
+/* TraceRecordBatch contains value vectors which are exactly the same
+ * as the incoming record batch's value vectors. If the incoming
+ * record batch has a selection vector (type 2) then TraceRecordBatch
+ * will also contain a selection vector.
+ *
+ * Purpose of this record batch is to dump the data associated with all
+ * the value vectors and selection vector to disk.
+ *
+ * This record batch does not modify any data or schema, it simply
+ * consumes the incoming record batch's data, dump to disk and pass the
+ * same set of value vectors (and selection vectors) to its parent record
+ * batch
+ */
 public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
 {
     static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
@@ -55,11 +71,33 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
     /* Location where the log should be dumped */
     private final String logLocation;
 
+    /* File descriptors needed to be able to dump to log file */
+    private FileOutputStream fos;
+    private FileChannel fc;
+
     public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
     {
         super(pop, context, incoming);
         this.traceTag = pop.traceTag;
         logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+        String fileName = getFileName();
+
+        /* Create the log file we will dump to and initialize the file descriptors */
+        try
+        {
+            File file = new File(fileName);
+
+            /* create the file */
+            file.createNewFile();
+
+            fos = new FileOutputStream(file, true);
+            fc  = fos.getChannel();
+
+        } catch (IOException e)
+        {
+            logger.error("Unable to create file: " + fileName);
+        }
     }
 
     @Override
@@ -75,6 +113,13 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
      * Function is invoked for every record batch and it simply
      * dumps the buffers associated with all the value vectors in
      * this record batch to a log file.
+     *
+     * Function is divided into three main parts
+     *   1. Get all the buffers(ByteBuf's) associated with incoming
+     *      record batch's value vectors and selection vector
+     *   2. Dump these buffers to the log file (performed by writeToFile())
+     *   3. Construct the record batch with these buffers to look exactly like
+     *      the incoming record batch (performed by reconstructRecordBatch())
      */
     @Override
     protected void doWork()
@@ -85,37 +130,87 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
         /* Get the array of buffers from the incoming record batch */
         WritableBatch batch = incoming.getWritableBatch();
 
-        BufferAllocator allocator = context.getAllocator();
         ByteBuf[] incomingBuffers = batch.getBuffers();
         RecordBatchDef batchDef = batch.getDef();
 
-        /* Total length of buffers across all value vectors */
-        int totalBufferLength = 0;
+        /* ByteBuf associated with the selection vector */
+        ByteBuf svBuf = null;
 
-        String fileName = getFileName();
+        /* Size of the selection vector */
+        int svCount = 0;
 
-        try
+        if (svMode == SelectionVectorMode.TWO_BYTE)
         {
-            File file = new File(fileName);
+            SelectionVector2 sv2 = incoming.getSelectionVector2();
+            svCount = sv2.getCount();
+            svBuf = sv2.getBuffer();
+        }
 
-            if (!file.exists())
-                file.createNewFile();
+        /* Write the ByteBuf for the value vectors and selection vectors to disk
+         * totalBufferLength is the sum of size of all the ByteBuf across all value vectors
+         */
+        int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);
 
-            FileOutputStream fos = new FileOutputStream(file, true);
+        /* Reconstruct the record batch from the ByteBuf's */
+        reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+    }
+
+    @Override
+    protected void setupNewSchema() throws SchemaChangeException
+    {
+        /* Trace operator does not deal with hyper vectors yet */
+        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+        /* we have a new schema, clear our existing container to
+         * load the new value vectors
+         */
+        container.clear();
 
+        /* Add all the value vectors in the container */
+        for(VectorWrapper<?> vv : incoming)
+        {
+            TransferPair tp = vv.getValueVector().getTransferPair();
+            container.add(tp.getTo());
+        }
+    }
+
+    @Override
+    public SelectionVector2 getSelectionVector2() {
+        return sv;
+    }
+
+    private String getFileName()
+    {
+        /* From the context, get the query id, major fragment id,
+         * minor fragment id. This will be used as the file name
+         * to which we will dump the incoming buffer data
+         */
+        FragmentHandle handle = incoming.getContext().getHandle();
+
+        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+        int majorFragmentId = handle.getMajorFragmentId();
+        int minorFragmentId = handle.getMinorFragmentId();
+
+        String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+
+        return fileName;
+    }
+
+    private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
+    {
+        String fileName = getFileName();
+        int totalBufferLength = 0;
+
+        try
+        {
             /* Write the metadata to the file */
             batchDef.writeDelimitedTo(fos);
 
-            FileChannel fc = fos.getChannel();
-
             /* If we have a selection vector, dump it to file first */
-            if (svMode == SelectionVectorMode.TWO_BYTE)
+            if (svBuf != null)
             {
-                SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
-                int recordCount = incomingSV2.getCount();
-                int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;
-
-                ByteBuf buf = incomingSV2.getBuffer();
 
                 /* For writing to the selection vectors we use
                  * setChar() method which does not modify the
@@ -123,32 +218,16 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
                  * without having to get each byte individually we need
                  * to set the writer index
                  */
-                buf.writerIndex(sv2Size);
-
-                /* dump the selection vector to log */
-                dumpByteBuf(fc, buf);
-
-                if (sv == null)
-                    sv = new SelectionVector2(allocator);
+                svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
 
-                sv.setRecordCount(recordCount);
-
-                /* create our selection vector from the
-                 * incoming selection vector's buffer
-                 */
-                sv.setBuffer(buf);
-
-                buf.release();
+                fc.write(svBuf.nioBuffers());
             }
 
-            /* For each buffer dump it to log and compute total length */
-            for (ByteBuf buf : incomingBuffers)
+            /* Dump the array of ByteBuf's associated with the value vectors */
+            for (ByteBuf buf : vvBufs)
             {
                 /* dump the buffer into the file channel */
-                dumpByteBuf(fc, buf);
-
-                /* Reset reader index on the ByteBuf so we can read it again */
-                buf.resetReaderIndex();
+                fc.write(buf.nioBuffers());
 
                 /* compute total length of buffer, will be used when
                  * we create a compound buffer
@@ -156,41 +235,66 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
                 totalBufferLength += buf.readableBytes();
             }
 
-            fc.close();
-            fos.close();
+            fc.force(true);
+            fos.flush();
 
         } catch (IOException e)
         {
             logger.error("Unable to write buffer to file: " + fileName);
         }
 
-        /* allocate memory for the compound buffer, compound buffer
-         * will contain the data from all the buffers across all the
-         * value vectors
-         */
-        ByteBuf byteBuf = allocator.buffer(totalBufferLength);
+        return totalBufferLength;
+    }
 
-        /* Copy data from each buffer into the compound buffer */
-        for (int i = 0; i < incomingBuffers.length; i++)
+    private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
+                                        ByteBuf[] vvBufs, int totalBufferLength,
+                                        ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
+    {
+        if (vvBufs.length > 0)    /* If we have ByteBuf's associated with value vectors */
         {
-            byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
-        }
+            CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
+
+            /* Copy data from each buffer into the compound buffer */
+            for (int i = 0; i < vvBufs.length; i++)
+            {
+                cbb.addComponent(vvBufs[i]);
+            }
 
-        List<FieldMetadata> fields = batchDef.getFieldList();
+            List<FieldMetadata> fields = batchDef.getFieldList();
 
-        int bufferOffset = 0;
+            int bufferOffset = 0;
 
-        /* For each value vector slice up the appropriate size from
-         * the compound buffer and load it into the value vector
+            /* For each value vector slice up the appropriate size from
+             * the compound buffer and load it into the value vector
+             */
+            int vectorIndex = 0;
+
+            for(VectorWrapper<?> vv : container)
+            {
+                FieldMetadata fmd = fields.get(vectorIndex);
+                ValueVector v = vv.getValueVector();
+                v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+                vectorIndex++;
+                bufferOffset += fmd.getBufferLength();
+            }
+        }
+
+        /* Set the selection vector for the record batch if the
+         * incoming batch had a selection vector
          */
-        int vectorIndex = 0;
-        for(VectorWrapper<?> vv : container)
+        if (svMode == SelectionVectorMode.TWO_BYTE)
         {
-            FieldMetadata fmd = fields.get(vectorIndex);
-            ValueVector v = vv.getValueVector();
-            v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
-            vectorIndex++;
-            bufferOffset += fmd.getBufferLength();
+            if (sv == null)
+                sv = new SelectionVector2(context.getAllocator());
+
+            sv.setRecordCount(svCount);
+
+            /* create our selection vector from the
+             * incoming selection vector's buffer
+             */
+            sv.setBuffer(svBuf);
+
+            svBuf.release();
         }
 
         container.buildSchema(svMode);
@@ -204,56 +308,21 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
     }
 
     @Override
-    protected void setupNewSchema() throws SchemaChangeException
+    protected void cleanup()
     {
-        /* Trace operator does not deal with hyper vectors yet */
-        if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
-            throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
-        /* we have a new schema, clear our existing container to
-         * load the new value vectors
-         */
-        container.clear();
+        /* Release the selection vector */
+        if (sv != null)
+          sv.clear();
 
-        /* Add all the value vectors in the container */
-        for(VectorWrapper<?> vv : incoming)
+        /* Close the file descriptors */
+        try
         {
-            TransferPair tp = vv.getValueVector().getTransferPair();
-            container.add(tp.getTo());
+            fos.close();
+            fc.close();
+        } catch (IOException e)
+        {
+            logger.error("Unable to close file descriptors for file: " + getFileName());
         }
     }
 
-    @Override
-    public SelectionVector2 getSelectionVector2() {
-        return sv;
-    }
-
-    private String getFileName()
-    {
-        /* From the context, get the query id, major fragment id,
-         * minor fragment id. This will be used as the file name
-         * to which we will dump the incoming buffer data
-         */
-        FragmentHandle handle = incoming.getContext().getHandle();
-
-        String qid = QueryIdHelper.getQueryId(handle.getQueryId());
-
-        int majorFragmentId = handle.getMajorFragmentId();
-        int minorFragmentId = handle.getMinorFragmentId();
-
-        return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
-    }
-
-    private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
-    {
-        int bufferLength = buf.readableBytes();
-
-        byte[] byteArray = new byte[bufferLength];
-
-        /* Transfer bytes to a byte array */
-        buf.readBytes(byteArray);
-
-        /* Drop the byte array into the file channel */
-        fc.write(ByteBuffer.wrap(byteArray));
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 25284b6..acc6c9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
       throw new UnsupportedOperationException();
     }
   }
-  
+
   protected abstract void setupNewSchema() throws SchemaChangeException;
   protected abstract void doWork();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index f305c00..60dd6fd 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2910,13 +2910,13 @@ public final class UserBitShared {
      */
     int getRecordCount();
 
-    // optional bool isSelectionVector2 = 3;
+    // optional bool is_selection_vector_2 = 3;
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     boolean hasIsSelectionVector2();
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     boolean getIsSelectionVector2();
   }
@@ -3084,17 +3084,17 @@ public final class UserBitShared {
       return recordCount_;
     }
 
-    // optional bool isSelectionVector2 = 3;
-    public static final int ISSELECTIONVECTOR2_FIELD_NUMBER = 3;
+    // optional bool is_selection_vector_2 = 3;
+    public static final int IS_SELECTION_VECTOR_2_FIELD_NUMBER = 3;
     private boolean isSelectionVector2_;
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     public boolean hasIsSelectionVector2() {
       return ((bitField0_ & 0x00000002) == 0x00000002);
     }
     /**
-     * <code>optional bool isSelectionVector2 = 3;</code>
+     * <code>optional bool is_selection_vector_2 = 3;</code>
      */
     public boolean getIsSelectionVector2() {
       return isSelectionVector2_;
@@ -3667,22 +3667,22 @@ public final class UserBitShared {
         return this;
       }
 
-      // optional bool isSelectionVector2 = 3;
+      // optional bool is_selection_vector_2 = 3;
       private boolean isSelectionVector2_ ;
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public boolean hasIsSelectionVector2() {
         return ((bitField0_ & 0x00000004) == 0x00000004);
       }
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public boolean getIsSelectionVector2() {
         return isSelectionVector2_;
       }
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public Builder setIsSelectionVector2(boolean value) {
         bitField0_ |= 0x00000004;
@@ -3691,7 +3691,7 @@ public final class UserBitShared {
         return this;
       }
       /**
-       * <code>optional bool isSelectionVector2 = 3;</code>
+       * <code>optional bool is_selection_vector_2 = 3;</code>
        */
       public Builder clearIsSelectionVector2() {
         bitField0_ = (bitField0_ & ~0x00000004);
@@ -4984,16 +4984,16 @@ public final class UserBitShared {
       "ror\030\005 \003(\0132\031.exec.shared.ParsingError\"\\\n\014" +
       "ParsingError\022\024\n\014start_column\030\002 \001(\005\022\021\n\tst" +
       "art_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(\005\022\017\n\007end" +
-      "_row\030\005 \001(\005\"\r\n\013RecordBatch\"m\n\016RecordBatch",
+      "_row\030\005 \001(\005\"\r\n\013RecordBatch\"p\n\016RecordBatch",
       "Def\022)\n\005field\030\001 \003(\0132\032.exec.shared.FieldMe" +
-      "tadata\022\024\n\014record_count\030\002 \001(\005\022\032\n\022isSelect" +
-      "ionVector2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n\003de" +
-      "f\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_count\030\002" +
-      " \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013group_c" +
-      "ount\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005chi" +
-      "ld\030\006 \003(\0132\032.exec.shared.FieldMetadataB.\n\033" +
-      "org.apache.drill.exec.protoB\rUserBitShar" +
-      "edH\001"
+      "tadata\022\024\n\014record_count\030\002 \001(\005\022\035\n\025is_selec" +
+      "tion_vector_2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n" +
+      "\003def\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_coun" +
+      "t\030\002 \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013grou" +
+      "p_count\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005" +
+      "child\030\006 \003(\0132\032.exec.shared.FieldMetadataB" +
+      ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
+      "haredH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 5bea284..0d98797 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -34,7 +34,7 @@ message RecordBatch{
 message RecordBatchDef {
 	repeated FieldMetadata field = 1;
 	optional int32 record_count = 2;
-    optional bool isSelectionVector2 = 3;
+    optional bool is_selection_vector_2 = 3;
 
 }