You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:19 UTC
[02/10] git commit: Addressed review comments from Jacques
Addressed review comments from Jacques
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/30ada5de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/30ada5de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/30ada5de
Branch: refs/heads/master
Commit: 30ada5decc9ab92fb95d075f2d01ab1387ce8c22
Parents: 6c78890
Author: Mehant Baid <me...@github.com>
Authored: Wed Oct 23 23:19:36 2013 -0700
Committer: Steven Phillips <sp...@maprtech.com>
Committed: Wed Oct 30 19:02:19 2013 -0700
----------------------------------------------------------------------
.../drill/exec/physical/impl/TraceInjector.java | 2 +-
.../physical/impl/trace/TraceRecordBatch.java | 281 ++++++++++++-------
.../exec/record/AbstractSingleRecordBatch.java | 2 +-
.../apache/drill/exec/proto/UserBitShared.java | 42 +--
protocol/src/main/protobuf/UserBitShared.proto | 2 +-
5 files changed, 199 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
index 3e82a73..9c859a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TraceInjector.java
@@ -74,7 +74,7 @@ public class TraceInjector extends AbstractPhysicalVisitor<PhysicalOperator, Fra
/* For every child operator create a trace operator as its parent */
for (int i = 0; i < newChildren.size(); i++)
{
- String traceTag = newChildren.toString() + Integer.toString(traceTagCount++);
+ String traceTag = newChildren.get(i).toString() + Integer.toString(traceTagCount++);
/* Trace operator */
Trace traceOp = new Trace(newChildren.get(i), traceTag);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 97131cd..d2828cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -24,7 +24,10 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Formatter;
+import com.google.common.collect.Iterators;
+import io.netty.buffer.CompositeByteBuf;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
@@ -43,6 +46,19 @@ import org.apache.drill.exec.proto.SchemaDefProtos.FieldDef;
import io.netty.buffer.ByteBuf;
+/* TraceRecordBatch contains value vectors which are exactly the same
+ * as the incoming record batch's value vectors. If the incoming
+ * record batch has a selection vector (type 2) then TraceRecordBatch
+ * will also contain a selection vector.
+ *
+ * Purpose of this record batch is to dump the data associated with all
+ * the value vectors and selection vector to disk.
+ *
+ * This record batch does not modify any data or schema, it simply
+ * consumes the incoming record batch's data, dump to disk and pass the
+ * same set of value vectors (and selection vectors) to its parent record
+ * batch
+ */
public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
@@ -55,11 +71,33 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
/* Location where the log should be dumped */
private final String logLocation;
+ /* File descriptors needed to be able to dump to log file */
+ private FileOutputStream fos;
+ private FileChannel fc;
+
public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
{
super(pop, context, incoming);
this.traceTag = pop.traceTag;
logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+ String fileName = getFileName();
+
+ /* Create the log file we will dump to and initialize the file descriptors */
+ try
+ {
+ File file = new File(fileName);
+
+ /* create the file */
+ file.createNewFile();
+
+ fos = new FileOutputStream(file, true);
+ fc = fos.getChannel();
+
+ } catch (IOException e)
+ {
+ logger.error("Unable to create file: " + fileName);
+ }
}
@Override
@@ -75,6 +113,13 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
* Function is invoked for every record batch and it simply
* dumps the buffers associated with all the value vectors in
* this record batch to a log file.
+ *
+ * Function is divided into three main parts
+ * 1. Get all the buffers(ByteBuf's) associated with incoming
+ * record batch's value vectors and selection vector
+ * 2. Dump these buffers to the log file (performed by writeToFile())
+ * 3. Construct the record batch with these buffers to look exactly like
+ * the incoming record batch (performed by reconstructRecordBatch())
*/
@Override
protected void doWork()
@@ -85,37 +130,87 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
/* Get the array of buffers from the incoming record batch */
WritableBatch batch = incoming.getWritableBatch();
- BufferAllocator allocator = context.getAllocator();
ByteBuf[] incomingBuffers = batch.getBuffers();
RecordBatchDef batchDef = batch.getDef();
- /* Total length of buffers across all value vectors */
- int totalBufferLength = 0;
+ /* ByteBuf associated with the selection vector */
+ ByteBuf svBuf = null;
- String fileName = getFileName();
+ /* Size of the selection vector */
+ int svCount = 0;
- try
+ if (svMode == SelectionVectorMode.TWO_BYTE)
{
- File file = new File(fileName);
+ SelectionVector2 sv2 = incoming.getSelectionVector2();
+ svCount = sv2.getCount();
+ svBuf = sv2.getBuffer();
+ }
- if (!file.exists())
- file.createNewFile();
+ /* Write the ByteBuf for the value vectors and selection vectors to disk
+ * totalBufferLength is the sum of size of all the ByteBuf across all value vectors
+ */
+ int totalBufferLength = writeToFile(batchDef, incomingBuffers, svBuf, svCount);
- FileOutputStream fos = new FileOutputStream(file, true);
+ /* Reconstruct the record batch from the ByteBuf's */
+ reconstructRecordBatch(batchDef, context, incomingBuffers, totalBufferLength, svBuf, svCount, svMode);
+ }
+
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException
+ {
+ /* Trace operator does not deal with hyper vectors yet */
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+ /* we have a new schema, clear our existing container to
+ * load the new value vectors
+ */
+ container.clear();
+ /* Add all the value vectors in the container */
+ for(VectorWrapper<?> vv : incoming)
+ {
+ TransferPair tp = vv.getValueVector().getTransferPair();
+ container.add(tp.getTo());
+ }
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv;
+ }
+
+ private String getFileName()
+ {
+ /* From the context, get the query id, major fragment id,
+ * minor fragment id. This will be used as the file name
+ * to which we will dump the incoming buffer data
+ */
+ FragmentHandle handle = incoming.getContext().getHandle();
+
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
+
+ String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+
+ return fileName;
+ }
+
+ private int writeToFile(RecordBatchDef batchDef, ByteBuf[] vvBufs, ByteBuf svBuf, int svCount)
+ {
+ String fileName = getFileName();
+ int totalBufferLength = 0;
+
+ try
+ {
/* Write the metadata to the file */
batchDef.writeDelimitedTo(fos);
- FileChannel fc = fos.getChannel();
-
/* If we have a selection vector, dump it to file first */
- if (svMode == SelectionVectorMode.TWO_BYTE)
+ if (svBuf != null)
{
- SelectionVector2 incomingSV2 = incoming.getSelectionVector2();
- int recordCount = incomingSV2.getCount();
- int sv2Size = recordCount * SelectionVector2.RECORD_SIZE;
-
- ByteBuf buf = incomingSV2.getBuffer();
/* For writing to the selection vectors we use
* setChar() method which does not modify the
@@ -123,32 +218,16 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
* without having to get each byte individually we need
* to set the writer index
*/
- buf.writerIndex(sv2Size);
-
- /* dump the selection vector to log */
- dumpByteBuf(fc, buf);
-
- if (sv == null)
- sv = new SelectionVector2(allocator);
+ svBuf.writerIndex(svCount * SelectionVector2.RECORD_SIZE);
- sv.setRecordCount(recordCount);
-
- /* create our selection vector from the
- * incoming selection vector's buffer
- */
- sv.setBuffer(buf);
-
- buf.release();
+ fc.write(svBuf.nioBuffers());
}
- /* For each buffer dump it to log and compute total length */
- for (ByteBuf buf : incomingBuffers)
+ /* Dump the array of ByteBuf's associated with the value vectors */
+ for (ByteBuf buf : vvBufs)
{
/* dump the buffer into the file channel */
- dumpByteBuf(fc, buf);
-
- /* Reset reader index on the ByteBuf so we can read it again */
- buf.resetReaderIndex();
+ fc.write(buf.nioBuffers());
/* compute total length of buffer, will be used when
* we create a compound buffer
@@ -156,41 +235,66 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
totalBufferLength += buf.readableBytes();
}
- fc.close();
- fos.close();
+ fc.force(true);
+ fos.flush();
} catch (IOException e)
{
logger.error("Unable to write buffer to file: " + fileName);
}
- /* allocate memory for the compound buffer, compound buffer
- * will contain the data from all the buffers across all the
- * value vectors
- */
- ByteBuf byteBuf = allocator.buffer(totalBufferLength);
+ return totalBufferLength;
+ }
- /* Copy data from each buffer into the compound buffer */
- for (int i = 0; i < incomingBuffers.length; i++)
+ private void reconstructRecordBatch(RecordBatchDef batchDef, FragmentContext context,
+ ByteBuf[] vvBufs, int totalBufferLength,
+ ByteBuf svBuf, int svCount, SelectionVectorMode svMode)
+ {
+ if (vvBufs.length > 0) /* If we have ByteBuf's associated with value vectors */
{
- byteBuf.writeBytes(incomingBuffers[i], incomingBuffers[i].readableBytes());
- }
+ CompositeByteBuf cbb = new CompositeByteBuf(vvBufs[0].alloc(), true, vvBufs.length);
+
+ /* Copy data from each buffer into the compound buffer */
+ for (int i = 0; i < vvBufs.length; i++)
+ {
+ cbb.addComponent(vvBufs[i]);
+ }
- List<FieldMetadata> fields = batchDef.getFieldList();
+ List<FieldMetadata> fields = batchDef.getFieldList();
- int bufferOffset = 0;
+ int bufferOffset = 0;
- /* For each value vector slice up the appropriate size from
- * the compound buffer and load it into the value vector
+ /* For each value vector slice up the appropriate size from
+ * the compound buffer and load it into the value vector
+ */
+ int vectorIndex = 0;
+
+ for(VectorWrapper<?> vv : container)
+ {
+ FieldMetadata fmd = fields.get(vectorIndex);
+ ValueVector v = vv.getValueVector();
+ v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+ vectorIndex++;
+ bufferOffset += fmd.getBufferLength();
+ }
+ }
+
+ /* Set the selection vector for the record batch if the
+ * incoming batch had a selection vector
*/
- int vectorIndex = 0;
- for(VectorWrapper<?> vv : container)
+ if (svMode == SelectionVectorMode.TWO_BYTE)
{
- FieldMetadata fmd = fields.get(vectorIndex);
- ValueVector v = vv.getValueVector();
- v.load(fmd, byteBuf.slice(bufferOffset, fmd.getBufferLength()));
- vectorIndex++;
- bufferOffset += fmd.getBufferLength();
+ if (sv == null)
+ sv = new SelectionVector2(context.getAllocator());
+
+ sv.setRecordCount(svCount);
+
+ /* create our selection vector from the
+ * incoming selection vector's buffer
+ */
+ sv.setBuffer(svBuf);
+
+ svBuf.release();
}
container.buildSchema(svMode);
@@ -204,56 +308,21 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
}
@Override
- protected void setupNewSchema() throws SchemaChangeException
+ protected void cleanup()
{
- /* Trace operator does not deal with hyper vectors yet */
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
- throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
- /* we have a new schema, clear our existing container to
- * load the new value vectors
- */
- container.clear();
+ /* Release the selection vector */
+ if (sv != null)
+ sv.clear();
- /* Add all the value vectors in the container */
- for(VectorWrapper<?> vv : incoming)
+ /* Close the file descriptors */
+ try
{
- TransferPair tp = vv.getValueVector().getTransferPair();
- container.add(tp.getTo());
+ fos.close();
+ fc.close();
+ } catch (IOException e)
+ {
+ logger.error("Unable to close file descriptors for file: " + getFileName());
}
}
- @Override
- public SelectionVector2 getSelectionVector2() {
- return sv;
- }
-
- private String getFileName()
- {
- /* From the context, get the query id, major fragment id,
- * minor fragment id. This will be used as the file name
- * to which we will dump the incoming buffer data
- */
- FragmentHandle handle = incoming.getContext().getHandle();
-
- String qid = QueryIdHelper.getQueryId(handle.getQueryId());
-
- int majorFragmentId = handle.getMajorFragmentId();
- int minorFragmentId = handle.getMinorFragmentId();
-
- return new String(logLocation + "/" + traceTag + "_" + qid + "_" + majorFragmentId + "_" + minorFragmentId);
- }
-
- private void dumpByteBuf(FileChannel fc, ByteBuf buf) throws IOException
- {
- int bufferLength = buf.readableBytes();
-
- byte[] byteArray = new byte[bufferLength];
-
- /* Transfer bytes to a byte array */
- buf.readBytes(byteArray);
-
- /* Drop the byte array into the file channel */
- fc.write(ByteBuffer.wrap(byteArray));
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 25284b6..acc6c9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
throw new UnsupportedOperationException();
}
}
-
+
protected abstract void setupNewSchema() throws SchemaChangeException;
protected abstract void doWork();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
----------------------------------------------------------------------
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
index f305c00..60dd6fd 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java
@@ -2910,13 +2910,13 @@ public final class UserBitShared {
*/
int getRecordCount();
- // optional bool isSelectionVector2 = 3;
+ // optional bool is_selection_vector_2 = 3;
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
boolean hasIsSelectionVector2();
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
boolean getIsSelectionVector2();
}
@@ -3084,17 +3084,17 @@ public final class UserBitShared {
return recordCount_;
}
- // optional bool isSelectionVector2 = 3;
- public static final int ISSELECTIONVECTOR2_FIELD_NUMBER = 3;
+ // optional bool is_selection_vector_2 = 3;
+ public static final int IS_SELECTION_VECTOR_2_FIELD_NUMBER = 3;
private boolean isSelectionVector2_;
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean hasIsSelectionVector2() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean getIsSelectionVector2() {
return isSelectionVector2_;
@@ -3667,22 +3667,22 @@ public final class UserBitShared {
return this;
}
- // optional bool isSelectionVector2 = 3;
+ // optional bool is_selection_vector_2 = 3;
private boolean isSelectionVector2_ ;
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean hasIsSelectionVector2() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public boolean getIsSelectionVector2() {
return isSelectionVector2_;
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public Builder setIsSelectionVector2(boolean value) {
bitField0_ |= 0x00000004;
@@ -3691,7 +3691,7 @@ public final class UserBitShared {
return this;
}
/**
- * <code>optional bool isSelectionVector2 = 3;</code>
+ * <code>optional bool is_selection_vector_2 = 3;</code>
*/
public Builder clearIsSelectionVector2() {
bitField0_ = (bitField0_ & ~0x00000004);
@@ -4984,16 +4984,16 @@ public final class UserBitShared {
"ror\030\005 \003(\0132\031.exec.shared.ParsingError\"\\\n\014" +
"ParsingError\022\024\n\014start_column\030\002 \001(\005\022\021\n\tst" +
"art_row\030\003 \001(\005\022\022\n\nend_column\030\004 \001(\005\022\017\n\007end" +
- "_row\030\005 \001(\005\"\r\n\013RecordBatch\"m\n\016RecordBatch",
+ "_row\030\005 \001(\005\"\r\n\013RecordBatch\"p\n\016RecordBatch",
"Def\022)\n\005field\030\001 \003(\0132\032.exec.shared.FieldMe" +
- "tadata\022\024\n\014record_count\030\002 \001(\005\022\032\n\022isSelect" +
- "ionVector2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n\003de" +
- "f\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_count\030\002" +
- " \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013group_c" +
- "ount\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005chi" +
- "ld\030\006 \003(\0132\032.exec.shared.FieldMetadataB.\n\033" +
- "org.apache.drill.exec.protoB\rUserBitShar" +
- "edH\001"
+ "tadata\022\024\n\014record_count\030\002 \001(\005\022\035\n\025is_selec" +
+ "tion_vector_2\030\003 \001(\010\"\261\001\n\rFieldMetadata\022\033\n" +
+ "\003def\030\001 \001(\0132\016.exec.FieldDef\022\023\n\013value_coun" +
+ "t\030\002 \001(\005\022\027\n\017var_byte_length\030\003 \001(\005\022\023\n\013grou" +
+ "p_count\030\004 \001(\005\022\025\n\rbuffer_length\030\005 \001(\005\022)\n\005" +
+ "child\030\006 \003(\0132\032.exec.shared.FieldMetadataB" +
+ ".\n\033org.apache.drill.exec.protoB\rUserBitS" +
+ "haredH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/30ada5de/protocol/src/main/protobuf/UserBitShared.proto
----------------------------------------------------------------------
diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto
index 5bea284..0d98797 100644
--- a/protocol/src/main/protobuf/UserBitShared.proto
+++ b/protocol/src/main/protobuf/UserBitShared.proto
@@ -34,7 +34,7 @@ message RecordBatch{
message RecordBatchDef {
repeated FieldMetadata field = 1;
optional int32 record_count = 2;
- optional bool isSelectionVector2 = 3;
+ optional bool is_selection_vector_2 = 3;
}