You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by ja...@apache.org on 2013/11/09 01:56:25 UTC
[08/10] git commit: Simple reformatting
Simple reformatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/8a571b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/8a571b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/8a571b3e
Branch: refs/heads/master
Commit: 8a571b3ea04f439b3303d7154b1450abcb2bf320
Parents: b44b6c7
Author: Jacques Nadeau <ja...@apache.org>
Authored: Fri Nov 8 09:20:28 2013 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Fri Nov 8 09:20:28 2013 -0800
----------------------------------------------------------------------
.../physical/impl/trace/TraceBatchCreator.java | 16 +-
.../physical/impl/trace/TraceRecordBatch.java | 224 +++++++++----------
.../apache/drill/exec/record/WritableBatch.java | 52 +++--
3 files changed, 138 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index e857c25..a24ec70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -29,13 +29,13 @@ import com.google.common.base.Preconditions;
import java.util.List;
public class TraceBatchCreator implements BatchCreator<Trace> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
-
- @Override
- public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException {
- //Preconditions.checkArgument(children.size() == 1);
- return new TraceRecordBatch(config, children.iterator().next(), context);
- }
-
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+ throws ExecutionSetupException {
+ // Preconditions.checkArgument(children.size() == 1);
+ return new TraceRecordBatch(config, children.iterator().next(), context);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index b73ddc1..1b990c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -65,139 +65,125 @@ import org.apache.hadoop.fs.Path;
* same set of value vectors (and selection vectors) to its parent record
* batch
*/
-public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace>
-{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
-
- private SelectionVector2 sv = null;
-
- /* Tag associated with each trace operator */
- final String traceTag;
-
- /* Location where the log should be dumped */
- private final String logLocation;
-
- /* File descriptors needed to be able to dump to log file */
- private OutputStream fos;
-
- public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context)
- {
- super(pop, context, incoming);
- this.traceTag = pop.traceTag;
- logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
-
- String fileName = getFileName();
-
- /* Create the log file we will dump to and initialize the file descriptors */
- try
- {
- Configuration conf = new Configuration();
- conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
- FileSystem fs = FileSystem.get(conf);
-
- /* create the file */
- fos = fs.create(new Path(fileName));
- } catch (IOException e)
- {
- logger.error("Unable to create file: " + fileName);
- }
- }
+public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceRecordBatch.class);
+
+ private SelectionVector2 sv = null;
+
+ /* Tag associated with each trace operator */
+ final String traceTag;
+
+ /* Location where the log should be dumped */
+ private final String logLocation;
+
+ /* File descriptors needed to be able to dump to log file */
+ private OutputStream fos;
+
+ public TraceRecordBatch(Trace pop, RecordBatch incoming, FragmentContext context) {
+ super(pop, context, incoming);
+ this.traceTag = pop.traceTag;
+ logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
+
+ String fileName = getFileName();
+
+ /* Create the log file we will dump to and initialize the file descriptors */
+ try {
+ Configuration conf = new Configuration();
+ conf.set("fs.name.default", ExecConstants.TRACE_DUMP_FILESYSTEM);
+ FileSystem fs = FileSystem.get(conf);
- @Override
- public int getRecordCount()
- {
- if (sv == null)
- return incoming.getRecordCount();
- else
- return sv.getCount();
+ /* create the file */
+ fos = fs.create(new Path(fileName));
+ } catch (IOException e) {
+ logger.error("Unable to create file: " + fileName);
}
+ }
+
+ @Override
+ public int getRecordCount() {
+ if (sv == null)
+ return incoming.getRecordCount();
+ else
+ return sv.getCount();
+ }
+
+ /**
+ * Function is invoked for every record batch and it simply dumps the buffers associated with all the value vectors in
+ * this record batch to a log file.
+ */
+ @Override
+ protected void doWork() {
+
+ boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
+ if (incomingHasSv2) {
+ sv = incoming.getSelectionVector2();
+ } else {
+ sv = null;
+ }
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true
+ : false);
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
+
+ try {
+ wrap.writeToStreamAndRetain(fos);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ batch.reconstructContainer(container);
+ }
- /**
- * Function is invoked for every record batch and it simply
- * dumps the buffers associated with all the value vectors in
- * this record batch to a log file.
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException {
+ /* Trace operator does not deal with hyper vectors yet */
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new SchemaChangeException("Trace operator does not work with hyper vectors");
+
+ /*
+ * we have a new schema, clear our existing container to load the new value vectors
*/
- @Override
- protected void doWork()
- {
-
- boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
- if (incomingHasSv2) {
- sv = incoming.getSelectionVector2();
- } else {
- sv = null;
- }
- WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(),
- incoming, incomingHasSv2 ? true : false);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
-
- try {
- wrap.writeToStreamAndRetain(fos);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- batch.reconstructContainer(container);
- }
+ container.clear();
- @Override
- protected void setupNewSchema() throws SchemaChangeException
- {
- /* Trace operator does not deal with hyper vectors yet */
- if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
- throw new SchemaChangeException("Trace operator does not work with hyper vectors");
-
- /* we have a new schema, clear our existing container to
- * load the new value vectors
- */
- container.clear();
-
- /* Add all the value vectors in the container */
- for(VectorWrapper<?> vv : incoming)
- {
- TransferPair tp = vv.getValueVector().getTransferPair();
- container.add(tp.getTo());
- }
+ /* Add all the value vectors in the container */
+ for (VectorWrapper<?> vv : incoming) {
+ TransferPair tp = vv.getValueVector().getTransferPair();
+ container.add(tp.getTo());
}
+ }
- @Override
- public SelectionVector2 getSelectionVector2() {
- return sv;
- }
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ return sv;
+ }
- private String getFileName()
- {
- /* From the context, get the query id, major fragment id,
- * minor fragment id. This will be used as the file name
- * to which we will dump the incoming buffer data
- */
- FragmentHandle handle = incoming.getContext().getHandle();
+ private String getFileName() {
+ /*
+ * From the context, get the query id, major fragment id, minor fragment id. This will be used as the file name to
+ * which we will dump the incoming buffer data
+ */
+ FragmentHandle handle = incoming.getContext().getHandle();
- String qid = QueryIdHelper.getQueryId(handle.getQueryId());
+ String qid = QueryIdHelper.getQueryId(handle.getQueryId());
- int majorFragmentId = handle.getMajorFragmentId();
- int minorFragmentId = handle.getMinorFragmentId();
+ int majorFragmentId = handle.getMajorFragmentId();
+ int minorFragmentId = handle.getMinorFragmentId();
- String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
+ String fileName = String.format("%s//%s_%s_%s_%s", logLocation, qid, majorFragmentId, minorFragmentId, traceTag);
- return fileName;
- }
+ return fileName;
+ }
+ @Override
+ protected void cleanup() {
+ /* Release the selection vector */
+ if (sv != null)
+ sv.clear();
- @Override
- protected void cleanup()
- {
- /* Release the selection vector */
- if (sv != null)
- sv.clear();
-
- /* Close the file descriptors */
- try
- {
- fos.close();
- } catch (IOException e)
- {
- logger.error("Unable to close file descriptors for file: " + getFileName());
- }
+ /* Close the file descriptors */
+ try {
+ fos.close();
+ } catch (IOException e) {
+ logger.error("Unable to close file descriptors for file: " + getFileName());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/8a571b3e/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index a33ca37..e9b56db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -60,17 +60,15 @@ public class WritableBatch {
return buffers;
}
- public void reconstructContainer(VectorContainer container)
- {
- Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
- if (buffers.length > 0) /* If we have ByteBuf's associated with value vectors */
- {
-
+ public void reconstructContainer(VectorContainer container) {
+ Preconditions.checkState(!cleared,
+ "Attempted to reconstruct a container from a WritableBatch after it had been cleared");
+ if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */
+
CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length);
- /* Copy data from each buffer into the compound buffer */
- for (ByteBuf buf : buffers)
- {
+ /* Copy data from each buffer into the compound buffer */
+ for (ByteBuf buf : buffers) {
cbb.addComponent(buf);
}
@@ -78,13 +76,12 @@ public class WritableBatch {
int bufferOffset = 0;
- /* For each value vector slice up the appropriate size from
- * the compound buffer and load it into the value vector
- */
+ /*
+ * For each value vector slice up the appropriate size from the compound buffer and load it into the value vector
+ */
int vectorIndex = 0;
- for(VectorWrapper<?> vv : container)
- {
+ for (VectorWrapper<?> vv : container) {
FieldMetadata fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
@@ -101,9 +98,8 @@ public class WritableBatch {
}
container.buildSchema(svMode);
- /* Set the record count in the value vector */
- for(VectorWrapper<?> v : container)
- {
+ /* Set the record count in the value vector */
+ for (VectorWrapper<?> v : container) {
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(def.getRecordCount());
}
@@ -118,23 +114,24 @@ public class WritableBatch {
public static WritableBatch getBatchNoHVWrap(int recordCount, Iterable<VectorWrapper<?>> vws, boolean isSV2) {
List<ValueVector> vectors = Lists.newArrayList();
- for(VectorWrapper<?> vw : vws){
+ for (VectorWrapper<?> vw : vws) {
Preconditions.checkArgument(!vw.isHyper());
vectors.add(vw.getValueVector());
}
return getBatchNoHV(recordCount, vectors, isSV2);
}
-
+
public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) {
List<ByteBuf> buffers = Lists.newArrayList();
List<FieldMetadata> metadata = Lists.newArrayList();
for (ValueVector vv : vectors) {
metadata.add(vv.getMetadata());
-
- // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
- if(recordCount == 0) continue;
-
+
+ // don't try to get the buffers if we don't have any records. It is possible the buffers are dead buffers.
+ if (recordCount == 0)
+ continue;
+
for (ByteBuf b : vv.getBuffers()) {
buffers.add(b);
}
@@ -142,14 +139,15 @@ public class WritableBatch {
vv.clear();
}
- RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount).setIsSelectionVector2(isSV2).build();
+ RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
+ .setIsSelectionVector2(isSV2).build();
WritableBatch b = new WritableBatch(batchDef, buffers);
return b;
}
-
+
public static WritableBatch get(RecordBatch batch) {
- if(batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
- throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
+ if (batch.getSchema() != null && batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE)
+ throw new UnsupportedOperationException("Only batches without hyper selections vectors are writable.");
boolean sv2 = (batch.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE);
return getBatchNoHVWrap(batch.getRecordCount(), batch, sv2);