You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/12/17 12:48:29 UTC
[drill] 06/12: DRILL-7476: Set lastSet on TransferPair copies
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit 79ea7b1f13c4864bfc9a25049b25b3369fce07cc
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Tue Dec 10 20:15:32 2019 -0800
DRILL-7476: Set lastSet on TransferPair copies
Variable-width nullable vectors maintain a "lastSet" field
in the mutator. This field is used in "fill empties" logic
when setting the vector's value count. This is true even
if the vector is read-only, or has been transferred from
another (read-only) vector. LastSet must be set to the
row count or the code will helpfully overwrite existing
offsets with 0.
closes #1922
---
.../org/apache/drill/exec/client/DrillClient.java | 6 +-
.../exec/physical/impl/limit/LimitRecordBatch.java | 5 +-
.../unorderedreceiver/UnorderedReceiverBatch.java | 2 +-
.../physical/impl/validate/BatchValidator.java | 43 +++++++++-
.../drill/exec/record/FragmentWritableBatch.java | 80 ++++++++++--------
.../apache/drill/exec/record/WritableBatch.java | 7 +-
.../drill/exec/work/fragment/FragmentExecutor.java | 95 ++++++++++++++--------
.../java/org/apache/drill/test/QueryBuilder.java | 20 ++---
.../codegen/templates/NullableValueVectors.java | 9 ++
9 files changed, 181 insertions(+), 86 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 7dc4d59..237aba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -112,8 +112,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
private volatile ClusterCoordinator clusterCoordinator;
private volatile boolean connected = false;
private final BufferAllocator allocator;
- private int reconnectTimes;
- private int reconnectDelay;
+ private final int reconnectTimes;
+ private final int reconnectDelay;
private boolean supportComplexTypes;
private final boolean ownsZkConnection;
private final boolean ownsAllocator;
@@ -862,7 +862,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
@Override
public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
- logger.debug("Result arrived: Result: {}", result );
+ logger.debug("Result arrived: Result: {}", result);
results.add(result);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index e9d7dd3..25c79ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -41,7 +41,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class);
- private SelectionVector2 outgoingSv;
+ private final SelectionVector2 outgoingSv;
private SelectionVector2 incomingSv;
// Start offset of the records
@@ -234,7 +234,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
outgoingSv.setRecordCount(svIndex);
outgoingSv.setBatchActualRecordCount(inputRecordCount);
// Actual number of values in the container; not the number in
- // the SV.
+ // the SV. Set record count, not value count. Value count is
+ // carried over from input vectors.
container.setRecordCount(inputRecordCount);
// Update the start offset
recordStartOffset = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index d40bd6d..f5fb77a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -206,7 +206,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
batch.release();
- if(schemaChanged) {
+ if (schemaChanged) {
this.schema = batchLoader.getSchema();
stats.batchReceived(0, rbd.getRecordCount(), true);
lastOutcome = IterOutcome.OK_NEW_SCHEMA;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 36d9c8f..2c657e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -25,6 +25,10 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVar16CharVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVarDecimalVector;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.RepeatedBitVector;
import org.apache.drill.exec.vector.UInt1Vector;
@@ -321,10 +325,38 @@ public class BatchValidator {
"Outer value count = %d, but inner value count = %d",
outerCount, valueCount));
}
+ int lastSet = getLastSet(vector);
+ if (lastSet != -2) {
+ if (lastSet != valueCount - 1) {
+ error(name, vector, String.format(
+ "Value count = %d, but last set = %d",
+ valueCount, lastSet));
+ }
+ }
verifyIsSetVector(vector, (UInt1Vector) vector.getBitsVector());
validateVector(name + "-values", valuesVector);
}
+ // getLastSet() is visible per vector type, not on a super class.
+ // There is no common nullable, variable width super class.
+
+ private int getLastSet(NullableVector vector) {
+ if (vector instanceof NullableVarCharVector) {
+ return ((NullableVarCharVector) vector).getMutator().getLastSet();
+ }
+ if (vector instanceof NullableVarBinaryVector) {
+ return ((NullableVarBinaryVector) vector).getMutator().getLastSet();
+ }
+ if (vector instanceof NullableVarDecimalVector) {
+ return ((NullableVarDecimalVector) vector).getMutator().getLastSet();
+ }
+ if (vector instanceof NullableVar16CharVector) {
+ return ((NullableVar16CharVector) vector).getMutator().getLastSet();
+ }
+ // Otherwise, return a value that is never legal for lastSet
+ return -2;
+ }
+
private void validateVarCharVector(String name, VarCharVector vector) {
int dataLength = vector.getBuffer().writerIndex();
validateVarWidthVector(name, vector, dataLength);
@@ -332,7 +364,12 @@ public class BatchValidator {
private void validateVarBinaryVector(String name, VarBinaryVector vector) {
int dataLength = vector.getBuffer().writerIndex();
- validateVarWidthVector(name, vector, dataLength);
+ int lastOffset = validateVarWidthVector(name, vector, dataLength);
+ if (lastOffset != dataLength) {
+ error(name, vector, String.format(
+ "Data vector has length %d, but offset vector has largest offset %d",
+ dataLength, lastOffset));
+ }
}
private void validateVarDecimalVector(String name, VarDecimalVector vector) {
@@ -340,9 +377,9 @@ public class BatchValidator {
validateVarWidthVector(name, vector, dataLength);
}
- private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) {
+ private int validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) {
int valueCount = vector.getAccessor().getValueCount();
- validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
+ return validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
valueCount, dataLength);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 5606d75..bbb988a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -23,25 +23,36 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-public class FragmentWritableBatch{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
+public class FragmentWritableBatch {
private static RecordBatchDef EMPTY_DEF = RecordBatchDef.newBuilder().setRecordCount(0).build();
private final ByteBuf[] buffers;
private final FragmentRecordBatch header;
- public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final WritableBatch batch){
- this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
+ public FragmentWritableBatch(boolean isLast, QueryId queryId,
+ int sendMajorFragmentId, int sendMinorFragmentId,
+ int receiveMajorFragmentId, int receiveMinorFragmentId,
+ WritableBatch batch) {
+ this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
+ new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
}
- public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds, final WritableBatch batch){
- this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), batch.getBuffers());
+ public FragmentWritableBatch(boolean isLast, QueryId queryId,
+ int sendMajorFragmentId, int sendMinorFragmentId,
+ int receiveMajorFragmentId, int[] receiveMinorFragmentIds,
+ WritableBatch batch) {
+ this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId,
+ receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(),
+ batch.getBuffers());
}
- private FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentId, final RecordBatchDef def, final ByteBuf... buffers){
+ private FragmentWritableBatch(boolean isLast, QueryId queryId,
+ int sendMajorFragmentId, int sendMinorFragmentId,
+ int receiveMajorFragmentId, int[] receiveMinorFragmentId,
+ RecordBatchDef def, ByteBuf... buffers) {
this.buffers = buffers;
- final FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
+ FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
.setIsLastBatch(isLast)
.setDef(def)
.setQueryId(queryId)
@@ -49,49 +60,60 @@ public class FragmentWritableBatch{
.setSendingMajorFragmentId(sendMajorFragmentId)
.setSendingMinorFragmentId(sendMinorFragmentId);
- for(final int i : receiveMinorFragmentId){
- builder.addReceivingMinorFragmentId(i);
+ for (int fragmentId : receiveMinorFragmentId) {
+ builder.addReceivingMinorFragmentId(fragmentId);
}
this.header = builder.build();
}
-
- public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId){
- return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
+ public static FragmentWritableBatch getEmptyLast(QueryId queryId,
+ int sendMajorFragmentId, int sendMinorFragmentId,
+ int receiveMajorFragmentId, int receiveMinorFragmentId) {
+ return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId,
+ receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
}
- public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds){
- return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF);
+ public static FragmentWritableBatch getEmptyLast(QueryId queryId,
+ int sendMajorFragmentId, int sendMinorFragmentId,
+ int receiveMajorFragmentId, int[] receiveMinorFragmentIds) {
+ return new FragmentWritableBatch(true, queryId, sendMajorFragmentId,
+ sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds,
+ EMPTY_DEF);
}
-
- public static FragmentWritableBatch getEmptyLastWithSchema(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId,
- final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
- return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
+ public static FragmentWritableBatch getEmptyLastWithSchema(
+ QueryId queryId, int sendMajorFragmentId,
+ int sendMinorFragmentId, int receiveMajorFragmentId,
+ int receiveMinorFragmentId, BatchSchema schema) {
+ return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId,
+ sendMinorFragmentId, receiveMajorFragmentId,
receiveMinorFragmentId, schema);
}
- public static FragmentWritableBatch getEmptyBatchWithSchema(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId,
- final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
+ public static FragmentWritableBatch getEmptyBatchWithSchema(
+ boolean isLast, QueryId queryId, int sendMajorFragmentId,
+ int sendMinorFragmentId, int receiveMajorFragmentId,
+ int receiveMinorFragmentId, BatchSchema schema) {
- final RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
+ RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
if (schema != null) {
- for (final MaterializedField field : schema) {
+ for (MaterializedField field : schema) {
def.addField(field.getSerializedField());
}
}
- return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
+ return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId,
+ sendMinorFragmentId, receiveMajorFragmentId,
new int[] { receiveMinorFragmentId }, def.build());
}
- public ByteBuf[] getBuffers(){
+ public ByteBuf[] getBuffers() {
return buffers;
}
public long getByteCount() {
long n = 0;
- for (final ByteBuf buf : buffers) {
+ for (ByteBuf buf : buffers) {
n += buf.readableBytes();
}
return n;
@@ -99,11 +121,5 @@ public class FragmentWritableBatch{
public FragmentRecordBatch getHeader() {
return header;
-
}
-
-
-
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 577517d..966ade7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -163,8 +163,11 @@ public class WritableBatch implements AutoCloseable {
vv.clear();
}
- RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
- .setCarriesTwoByteSelectionVector(isSV2).build();
+ RecordBatchDef batchDef = RecordBatchDef.newBuilder()
+ .addAllField(metadata)
+ .setRecordCount(recordCount)
+ .setCarriesTwoByteSelectionVector(isSV2)
+ .build();
WritableBatch b = new WritableBatch(batchDef, buffers);
return b;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 3e4d94a..79884e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -48,60 +48,89 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
/**
* <h2>Overview</h2>
* <p>
- * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation messages.
+ * Responsible for running a single fragment on a single Drillbit.
+ * Listens/responds to status request and cancellation messages.
* </p>
* <h2>Theory of Operation</h2>
* <p>
- * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the {@link FragmentExecutor#run()} method in a single thread. While a fragment is running
- * it may be subject to termination requests. The {@link FragmentExecutor} is reponsible for gracefully handling termination requests for the {@link RootExec}. There
- * are two types of termination messages:
- * <ol>
- * <li><b>Cancellation Request:</b> This signals that the fragment and therefore the {@link RootExec} need to terminate immediately.</li>
- * <li><b>Receiver Finished:</b> This signals that a downstream receiver no longer needs anymore data. A fragment may recieve multiple receiver finished requests
- * (one for each downstream receiver). The {@link RootExec} will only terminate once it has recieved {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages
- * for all downstream receivers.</li>
- * </ol>
+ * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the
+ * {@link FragmentExecutor#run()} method in a single thread. While a fragment is
+ * running it may be subject to termination requests. The
+ * {@link FragmentExecutor} is responsible for gracefully handling termination
+ * requests for the {@link RootExec}. There are two types of termination
+ * messages:
+ * <ol>
+ * <li><b>Cancellation Request:</b> This signals that the fragment and therefore
+ * the {@link RootExec} need to terminate immediately.</li>
+ * <li><b>Receiver Finished:</b> This signals that a downstream receiver no
+ * longer needs anymore data. A fragment may receive multiple receiver finished
+ * requests (one for each downstream receiver). The {@link RootExec} will only
+ * terminate once it has received
+ * {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages for all
+ * downstream receivers.</li>
+ * </ol>
* </p>
* <p>
- * The {@link FragmentExecutor} processes termination requests appropriately for the {@link RootExec}. A <b>Cancellation Request</b> is signalled when
- * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event is signalled when {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is
- * called. The way in which these signals are handled is the following:
+ * The {@link FragmentExecutor} processes termination requests appropriately for
+ * the {@link RootExec}. A <b>Cancellation Request</b> is signaled when
+ * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event
+ * is signaled when
+ * {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called.
+ * The way in which these signals are handled is the following:
* </p>
* <h3>Cancellation Request</h3>
* <p>
- * There are two ways in which a cancellation request can be handled when {@link FragmentExecutor#cancel()} is called.
- * <ol>
- * <li>The Cancellation Request is recieved before the {@link RootExec} for the fragment is even started. In this case we can cleanup resources allocated for the fragment
- * and never start a {@link RootExec}</li>
- * <li>The Cancellation Request is recieve after the {@link RootExec} for the fragment is started. In this the cancellation request is sent to the
- * {@link FragmentEventProcessor}. If this is not the first cancellation request it is ignored. If this is the first cancellation request the {@link RootExec} for this
- * fragment is terminated by interrupting it. Then the {@link FragmentExecutor#run()} thread proceeds to cleanup resources normally</li>
- * </ol>
+ * There are two ways in which a cancellation request can be handled when
+ * {@link FragmentExecutor#cancel()} is called.
+ * <ol>
+ * <li>The Cancellation Request is received before the {@link RootExec} for the
+ * fragment is even started. In this case we can cleanup resources allocated for
+ * the fragment and never start a {@link RootExec}</li>
+ * <li>The Cancellation Request is receive after the {@link RootExec} for the
+ * fragment is started. In this the cancellation request is sent to the
+ * {@link FragmentEventProcessor}. If this is not the first cancellation request
+ * it is ignored. If this is the first cancellation request the {@link RootExec}
+ * for this fragment is terminated by interrupting it. Then the
+ * {@link FragmentExecutor#run()} thread proceeds to cleanup resources
+ * normally</li>
+ * </ol>
* </p>
* <h3>Receiver Finished</h3>
* <p>
- * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called, the message is passed to the {@link FragmentEventProcessor} if we
- * did not already recieve a Cancellation request. Then the finished message is queued in {@link FragmentExecutor#receiverFinishedQueue}. The {@link FragmentExecutor#run()} polls
- * {@link FragmentExecutor#receiverFinishedQueue} and singlas the {@link RootExec} with {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately.
+ * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is
+ * called, the message is passed to the {@link FragmentEventProcessor} if we did
+ * not already receive a Cancellation request. Then the finished message is
+ * queued in {@link FragmentExecutor#receiverFinishedQueue}. The
+ * {@link FragmentExecutor#run()} polls
+ * {@link FragmentExecutor#receiverFinishedQueue} and signals the
+ * {@link RootExec} with
+ * {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately.
* </p>
- * <h2>Possible Design Flaws / Poorly Defined Behavoir</h2>
+ * <h2>Possible Design Flaws / Poorly Defined Behavior</h2>
* <p>
- * There are still a few aspects of the {@link FragmentExecutor} design that are not clear.
- * <ol>
- * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, will we eventually get one from every downstream receiver?</li>
- * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we cancel the fragment?</li>
- * <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we run out of data from the upstream?</li>
- * </ol>
+ * There are still a few aspects of the {@link FragmentExecutor} design that are
+ * not clear.
+ * <ol>
+ * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver,
+ * will we eventually get one from every downstream receiver?</li>
+ * <li>What happens when we process a <b>Receiver Finished</b> message for some
+ * (but not all) downstream receivers and then we cancel the fragment?</li>
+ * <li>What happens when we process a <b>Receiver Finished</b> message for some
+ * (but not all) downstream receivers and then we run out of data from the
+ * upstream?</li>
+ * </ol>
* </p>
*/
public class FragmentExecutor implements Runnable {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
+ private static final Logger logger = LoggerFactory.getLogger(FragmentExecutor.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class);
private final String fragmentName;
@@ -542,7 +571,7 @@ public class FragmentExecutor implements Runnable {
* This is especially important as fragments can take longer to start
*/
private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
- private AtomicBoolean terminate = new AtomicBoolean(false);
+ private final AtomicBoolean terminate = new AtomicBoolean(false);
void cancel() {
sendEvent(new FragmentEvent(EventType.CANCEL, null));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 6c7176b..2352d0a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -86,7 +86,7 @@ public class QueryBuilder {
private QueryId queryId;
private int recordCount;
private int batchCount;
- private long startTime;
+ private final long startTime;
public SummaryOnlyQueryEventListener(QuerySummaryFuture future) {
this.future = future;
@@ -132,7 +132,7 @@ public class QueryBuilder {
* launched the query.
*/
- private CountDownLatch lock = new CountDownLatch(1);
+ private final CountDownLatch lock = new CountDownLatch(1);
private QuerySummary summary;
/**
@@ -373,7 +373,7 @@ public class QueryBuilder {
// Unload the batch and convert to a row set.
- final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+ RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
try {
loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
resultBatch.release();
@@ -760,18 +760,18 @@ public class QueryBuilder {
*/
private String queryPlan(String columnName) throws Exception {
Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain an SQL query.");
- final List<QueryDataBatch> results = results();
- final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
- final StringBuilder builder = new StringBuilder();
+ List<QueryDataBatch> results = results();
+ RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+ StringBuilder builder = new StringBuilder();
- for (final QueryDataBatch b : results) {
+ for (QueryDataBatch b : results) {
if (!b.hasData()) {
continue;
}
loader.load(b.getHeader().getDef(), b.getData());
- final VectorWrapper<?> vw;
+ VectorWrapper<?> vw;
try {
vw = loader.getValueAccessorById(
NullableVarCharVector.class,
@@ -780,9 +780,9 @@ public class QueryBuilder {
throw new IllegalStateException("Looks like you did not provide an explain plan query, please add EXPLAIN PLAN FOR to the beginning of your query.");
}
- final ValueVector vv = vw.getValueVector();
+ ValueVector vv = vw.getValueVector();
for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
- final Object o = vv.getAccessor().getObject(i);
+ Object o = vv.getAccessor().getObject(i);
builder.append(o);
}
loader.clear();
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index e32ecc9..d92cb5d 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -299,6 +299,15 @@ public final class ${className} extends BaseDataValueVector implements <#if type
int bitsLength = bitsField.getBufferLength();
SerializedField valuesField = metadata.getChild(1);
values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength));
+ <#if type.major == "VarLen">
+
+ // Though a loaded vector should be read only,
+ // it can have its values set such as when copying
+ // with transfer pairs. Since lastSet is used when
+ // setting values, it must be set on vector load.
+
+ mutator.lastSet = accessor.getValueCount() - 1;
+ </#if>
}
@Override