You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/12/17 12:48:29 UTC

[drill] 06/12: DRILL-7476: Set lastSet on TransferPair copies

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 79ea7b1f13c4864bfc9a25049b25b3369fce07cc
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Tue Dec 10 20:15:32 2019 -0800

    DRILL-7476: Set lastSet on TransferPair copies
    
    Variable-width nullable vectors maintain a "lastSet" field
    in the mutator. This field is used in "fill empties" logic
    when setting the vector's value count. This is true even
    if the vector is read-only, or has been transferred from
    another (read-only) vector. LastSet must be set to the
    row count or the code will helpfully overwrite existing
    offsets with 0.
    
    closes #1922
---
 .../org/apache/drill/exec/client/DrillClient.java  |  6 +-
 .../exec/physical/impl/limit/LimitRecordBatch.java |  5 +-
 .../unorderedreceiver/UnorderedReceiverBatch.java  |  2 +-
 .../physical/impl/validate/BatchValidator.java     | 43 +++++++++-
 .../drill/exec/record/FragmentWritableBatch.java   | 80 ++++++++++--------
 .../apache/drill/exec/record/WritableBatch.java    |  7 +-
 .../drill/exec/work/fragment/FragmentExecutor.java | 95 ++++++++++++++--------
 .../java/org/apache/drill/test/QueryBuilder.java   | 20 ++---
 .../codegen/templates/NullableValueVectors.java    |  9 ++
 9 files changed, 181 insertions(+), 86 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 7dc4d59..237aba1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -112,8 +112,8 @@ public class DrillClient implements Closeable, ConnectionThrottle {
   private volatile ClusterCoordinator clusterCoordinator;
   private volatile boolean connected = false;
   private final BufferAllocator allocator;
-  private int reconnectTimes;
-  private int reconnectDelay;
+  private final int reconnectTimes;
+  private final int reconnectDelay;
   private boolean supportComplexTypes;
   private final boolean ownsZkConnection;
   private final boolean ownsAllocator;
@@ -862,7 +862,7 @@ public class DrillClient implements Closeable, ConnectionThrottle {
 
     @Override
     public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
-      logger.debug("Result arrived:  Result: {}", result );
+      logger.debug("Result arrived:  Result: {}", result);
       results.add(result);
     }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index e9d7dd3..25c79ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -41,7 +41,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
 public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
   private static final Logger logger = LoggerFactory.getLogger(LimitRecordBatch.class);
 
-  private SelectionVector2 outgoingSv;
+  private final SelectionVector2 outgoingSv;
   private SelectionVector2 incomingSv;
 
   // Start offset of the records
@@ -234,7 +234,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
     outgoingSv.setRecordCount(svIndex);
     outgoingSv.setBatchActualRecordCount(inputRecordCount);
     // Actual number of values in the container; not the number in
-    // the SV.
+    // the SV. Set record count, not value count. Value count is
+    // carried over from input vectors.
     container.setRecordCount(inputRecordCount);
     // Update the start offset
     recordStartOffset = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index d40bd6d..f5fb77a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -206,7 +206,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
       stats.addLongStat(Metric.BYTES_RECEIVED, batch.getByteCount());
 
       batch.release();
-      if(schemaChanged) {
+      if (schemaChanged) {
         this.schema = batchLoader.getSchema();
         stats.batchReceived(0, rbd.getRecordCount(), true);
         lastOutcome = IterOutcome.OK_NEW_SCHEMA;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
index 36d9c8f..2c657e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/BatchValidator.java
@@ -25,6 +25,10 @@ import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.BitVector;
 import org.apache.drill.exec.vector.FixedWidthVector;
+import org.apache.drill.exec.vector.NullableVar16CharVector;
+import org.apache.drill.exec.vector.NullableVarBinaryVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.NullableVarDecimalVector;
 import org.apache.drill.exec.vector.NullableVector;
 import org.apache.drill.exec.vector.RepeatedBitVector;
 import org.apache.drill.exec.vector.UInt1Vector;
@@ -321,10 +325,38 @@ public class BatchValidator {
           "Outer value count = %d, but inner value count = %d",
           outerCount, valueCount));
     }
+    int lastSet = getLastSet(vector);
+    if (lastSet != -2) {
+      if (lastSet != valueCount - 1) {
+        error(name, vector, String.format(
+            "Value count = %d, but last set = %d",
+            valueCount, lastSet));
+      }
+    }
     verifyIsSetVector(vector, (UInt1Vector) vector.getBitsVector());
     validateVector(name + "-values", valuesVector);
   }
 
+  // getLastSet() is visible per vector type, not on a super class.
+  // There is no common nullable, variable width super class.
+
+  private int getLastSet(NullableVector vector) {
+    if (vector instanceof NullableVarCharVector) {
+      return ((NullableVarCharVector) vector).getMutator().getLastSet();
+    }
+    if (vector instanceof NullableVarBinaryVector) {
+      return ((NullableVarBinaryVector) vector).getMutator().getLastSet();
+    }
+    if (vector instanceof NullableVarDecimalVector) {
+      return ((NullableVarDecimalVector) vector).getMutator().getLastSet();
+    }
+    if (vector instanceof NullableVar16CharVector) {
+      return ((NullableVar16CharVector) vector).getMutator().getLastSet();
+    }
+    // Otherwise, return a value that is never legal for lastSet
+    return -2;
+  }
+
   private void validateVarCharVector(String name, VarCharVector vector) {
     int dataLength = vector.getBuffer().writerIndex();
     validateVarWidthVector(name, vector, dataLength);
@@ -332,7 +364,12 @@ public class BatchValidator {
 
   private void validateVarBinaryVector(String name, VarBinaryVector vector) {
     int dataLength = vector.getBuffer().writerIndex();
-    validateVarWidthVector(name, vector, dataLength);
+    int lastOffset = validateVarWidthVector(name, vector, dataLength);
+    if (lastOffset != dataLength) {
+      error(name, vector, String.format(
+          "Data vector has length %d, but offset vector has largest offset %d",
+          dataLength, lastOffset));
+    }
   }
 
   private void validateVarDecimalVector(String name, VarDecimalVector vector) {
@@ -340,9 +377,9 @@ public class BatchValidator {
     validateVarWidthVector(name, vector, dataLength);
   }
 
-  private void validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) {
+  private int validateVarWidthVector(String name, VariableWidthVector vector, int dataLength) {
     int valueCount = vector.getAccessor().getValueCount();
-    validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
+    return validateOffsetVector(name + "-offsets", vector.getOffsetVector(),
         valueCount, dataLength);
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
index 5606d75..bbb988a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/FragmentWritableBatch.java
@@ -23,25 +23,36 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
 
-public class FragmentWritableBatch{
-  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentWritableBatch.class);
+public class FragmentWritableBatch {
 
   private static RecordBatchDef EMPTY_DEF = RecordBatchDef.newBuilder().setRecordCount(0).build();
 
   private final ByteBuf[] buffers;
   private final FragmentRecordBatch header;
 
-  public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final WritableBatch batch){
-    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
+  public FragmentWritableBatch(boolean isLast, QueryId queryId,
+      int sendMajorFragmentId, int sendMinorFragmentId,
+      int receiveMajorFragmentId, int receiveMinorFragmentId,
+      WritableBatch batch) {
+    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
+        new int[]{receiveMinorFragmentId}, batch.getDef(), batch.getBuffers());
   }
 
-  public FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds, final WritableBatch batch){
-    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(), batch.getBuffers());
+  public FragmentWritableBatch(boolean isLast, QueryId queryId,
+      int sendMajorFragmentId, int sendMinorFragmentId,
+      int receiveMajorFragmentId, int[] receiveMinorFragmentIds,
+      WritableBatch batch) {
+    this(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId,
+        receiveMajorFragmentId, receiveMinorFragmentIds, batch.getDef(),
+        batch.getBuffers());
   }
 
-  private FragmentWritableBatch(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentId, final RecordBatchDef def, final ByteBuf... buffers){
+  private FragmentWritableBatch(boolean isLast, QueryId queryId,
+      int sendMajorFragmentId, int sendMinorFragmentId,
+      int receiveMajorFragmentId, int[] receiveMinorFragmentId,
+      RecordBatchDef def, ByteBuf... buffers) {
     this.buffers = buffers;
-    final FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
+    FragmentRecordBatch.Builder builder = FragmentRecordBatch.newBuilder()
         .setIsLastBatch(isLast)
         .setDef(def)
         .setQueryId(queryId)
@@ -49,49 +60,60 @@ public class FragmentWritableBatch{
         .setSendingMajorFragmentId(sendMajorFragmentId)
         .setSendingMinorFragmentId(sendMinorFragmentId);
 
-    for(final int i : receiveMinorFragmentId){
-      builder.addReceivingMinorFragmentId(i);
+    for (int fragmentId : receiveMinorFragmentId) {
+      builder.addReceivingMinorFragmentId(fragmentId);
     }
 
     this.header = builder.build();
   }
 
-
-  public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId){
-    return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
+  public static FragmentWritableBatch getEmptyLast(QueryId queryId,
+      int sendMajorFragmentId, int sendMinorFragmentId,
+      int receiveMajorFragmentId, int receiveMinorFragmentId) {
+    return getEmptyLast(queryId, sendMajorFragmentId, sendMinorFragmentId,
+        receiveMajorFragmentId, new int[]{receiveMinorFragmentId});
   }
 
-  public static FragmentWritableBatch getEmptyLast(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId, final int receiveMajorFragmentId, final int[] receiveMinorFragmentIds){
-    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds, EMPTY_DEF);
+  public static FragmentWritableBatch getEmptyLast(QueryId queryId,
+      int sendMajorFragmentId, int sendMinorFragmentId,
+      int receiveMajorFragmentId, int[] receiveMinorFragmentIds) {
+    return new FragmentWritableBatch(true, queryId, sendMajorFragmentId,
+        sendMinorFragmentId, receiveMajorFragmentId, receiveMinorFragmentIds,
+        EMPTY_DEF);
   }
 
-
-  public static FragmentWritableBatch getEmptyLastWithSchema(final QueryId queryId, final int sendMajorFragmentId, final int sendMinorFragmentId,
-                                                             final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
-    return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
+  public static FragmentWritableBatch getEmptyLastWithSchema(
+      QueryId queryId, int sendMajorFragmentId,
+      int sendMinorFragmentId, int receiveMajorFragmentId,
+      int receiveMinorFragmentId, BatchSchema schema) {
+    return getEmptyBatchWithSchema(true, queryId, sendMajorFragmentId,
+        sendMinorFragmentId, receiveMajorFragmentId,
         receiveMinorFragmentId, schema);
   }
 
-  public static FragmentWritableBatch getEmptyBatchWithSchema(final boolean isLast, final QueryId queryId, final int sendMajorFragmentId,
-      final int sendMinorFragmentId, final int receiveMajorFragmentId, final int receiveMinorFragmentId, final BatchSchema schema){
+  public static FragmentWritableBatch getEmptyBatchWithSchema(
+      boolean isLast, QueryId queryId, int sendMajorFragmentId,
+      int sendMinorFragmentId, int receiveMajorFragmentId,
+      int receiveMinorFragmentId, BatchSchema schema) {
 
-    final RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
+    RecordBatchDef.Builder def = RecordBatchDef.newBuilder();
     if (schema != null) {
-      for (final MaterializedField field : schema) {
+      for (MaterializedField field : schema) {
         def.addField(field.getSerializedField());
       }
     }
-    return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId, sendMinorFragmentId, receiveMajorFragmentId,
+    return new FragmentWritableBatch(isLast, queryId, sendMajorFragmentId,
+        sendMinorFragmentId, receiveMajorFragmentId,
         new int[] { receiveMinorFragmentId }, def.build());
   }
 
-  public ByteBuf[] getBuffers(){
+  public ByteBuf[] getBuffers() {
     return buffers;
   }
 
   public long getByteCount() {
     long n = 0;
-    for (final ByteBuf buf : buffers) {
+    for (ByteBuf buf : buffers) {
       n += buf.readableBytes();
     }
     return n;
@@ -99,11 +121,5 @@ public class FragmentWritableBatch{
 
   public FragmentRecordBatch getHeader() {
     return header;
-
   }
-
-
-
-
-
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 577517d..966ade7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -163,8 +163,11 @@ public class WritableBatch implements AutoCloseable {
       vv.clear();
     }
 
-    RecordBatchDef batchDef = RecordBatchDef.newBuilder().addAllField(metadata).setRecordCount(recordCount)
-        .setCarriesTwoByteSelectionVector(isSV2).build();
+    RecordBatchDef batchDef = RecordBatchDef.newBuilder()
+        .addAllField(metadata)
+        .setRecordCount(recordCount)
+        .setCarriesTwoByteSelectionVector(isSV2)
+        .build();
     WritableBatch b = new WritableBatch(batchDef, buffers);
     return b;
   }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 3e4d94a..79884e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -48,60 +48,89 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM;
 
 /**
  * <h2>Overview</h2>
  * <p>
- *   Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation messages.
+ * Responsible for running a single fragment on a single Drillbit.
+ * Listens/responds to status request and cancellation messages.
  * </p>
  * <h2>Theory of Operation</h2>
  * <p>
- *  The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the {@link FragmentExecutor#run()} method in a single thread. While a fragment is running
- *  it may be subject to termination requests. The {@link FragmentExecutor} is reponsible for gracefully handling termination requests for the {@link RootExec}. There
- *  are two types of termination messages:
- *  <ol>
- *    <li><b>Cancellation Request:</b> This signals that the fragment and therefore the {@link RootExec} need to terminate immediately.</li>
- *    <li><b>Receiver Finished:</b> This signals that a downstream receiver no longer needs anymore data. A fragment may recieve multiple receiver finished requests
- *    (one for each downstream receiver). The {@link RootExec} will only terminate once it has recieved {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages
- *    for all downstream receivers.</li>
- *  </ol>
+ * The {@link FragmentExecutor} runs a fragment's {@link RootExec} in the
+ * {@link FragmentExecutor#run()} method in a single thread. While a fragment is
+ * running it may be subject to termination requests. The
+ * {@link FragmentExecutor} is responsible for gracefully handling termination
+ * requests for the {@link RootExec}. There are two types of termination
+ * messages:
+ * <ol>
+ * <li><b>Cancellation Request:</b> This signals that the fragment and therefore
+ * the {@link RootExec} need to terminate immediately.</li>
+ * <li><b>Receiver Finished:</b> This signals that a downstream receiver no
+ * longer needs anymore data. A fragment may receive multiple receiver finished
+ * requests (one for each downstream receiver). The {@link RootExec} will only
+ * terminate once it has received
+ * {@link FragmentExecutor.EventType#RECEIVER_FINISHED} messages for all
+ * downstream receivers.</li>
+ * </ol>
  * </p>
  * <p>
- *   The {@link FragmentExecutor} processes termination requests appropriately for the {@link RootExec}. A <b>Cancellation Request</b> is signalled when
- *   {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event is signalled when {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is
- *   called. The way in which these signals are handled is the following:
+ * The {@link FragmentExecutor} processes termination requests appropriately for
+ * the {@link RootExec}. A <b>Cancellation Request</b> is signaled when
+ * {@link FragmentExecutor#cancel()} is called. A <b>Receiver Finished</b> event
+ * is signaled when
+ * {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called.
+ * The way in which these signals are handled is the following:
  * </p>
  * <h3>Cancellation Request</h3>
  * <p>
- *   There are two ways in which a cancellation request can be handled when {@link FragmentExecutor#cancel()} is called.
- *   <ol>
- *     <li>The Cancellation Request is recieved before the {@link RootExec} for the fragment is even started. In this case we can cleanup resources allocated for the fragment
- *     and never start a {@link RootExec}</li>
- *     <li>The Cancellation Request is recieve after the {@link RootExec} for the fragment is started. In this the cancellation request is sent to the
- *     {@link FragmentEventProcessor}. If this is not the first cancellation request it is ignored. If this is the first cancellation request the {@link RootExec} for this
- *     fragment is terminated by interrupting it. Then the {@link FragmentExecutor#run()} thread proceeds to cleanup resources normally</li>
- *   </ol>
+ * There are two ways in which a cancellation request can be handled when
+ * {@link FragmentExecutor#cancel()} is called.
+ * <ol>
+ * <li>The Cancellation Request is received before the {@link RootExec} for the
+ * fragment is even started. In this case we can cleanup resources allocated for
+ * the fragment and never start a {@link RootExec}</li>
+ * <li>The Cancellation Request is receive after the {@link RootExec} for the
+ * fragment is started. In this the cancellation request is sent to the
+ * {@link FragmentEventProcessor}. If this is not the first cancellation request
+ * it is ignored. If this is the first cancellation request the {@link RootExec}
+ * for this fragment is terminated by interrupting it. Then the
+ * {@link FragmentExecutor#run()} thread proceeds to cleanup resources
+ * normally</li>
+ * </ol>
  * </p>
  * <h3>Receiver Finished</h3>
  * <p>
- *  When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is called, the message is passed to the {@link FragmentEventProcessor} if we
- *  did not already recieve a Cancellation request. Then the finished message is queued in {@link FragmentExecutor#receiverFinishedQueue}. The {@link FragmentExecutor#run()} polls
- *  {@link FragmentExecutor#receiverFinishedQueue} and singlas the {@link RootExec} with {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately.
+ * When {@link FragmentExecutor#receivingFragmentFinished(FragmentHandle)} is
+ * called, the message is passed to the {@link FragmentEventProcessor} if we did
+ * not already receive a Cancellation request. Then the finished message is
+ * queued in {@link FragmentExecutor#receiverFinishedQueue}. The
+ * {@link FragmentExecutor#run()} polls
+ * {@link FragmentExecutor#receiverFinishedQueue} and signals the
+ * {@link RootExec} with
+ * {@link RootExec#receivingFragmentFinished(FragmentHandle)} appropriately.
  * </p>
- * <h2>Possible Design Flaws / Poorly Defined Behavoir</h2>
+ * <h2>Possible Design Flaws / Poorly Defined Behavior</h2>
  * <p>
- *   There are still a few aspects of the {@link FragmentExecutor} design that are not clear.
- *   <ol>
- *     <li>If we get a <b>Receiver Finished</b> message for one downstream receiver, will we eventually get one from every downstream receiver?</li>
- *     <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we cancel the fragment?</li>
- *     <li>What happens when we process a <b>Receiver Finished</b> message for some (but not all) downstream receivers and then we run out of data from the upstream?</li>
- *   </ol>
+ * There are still a few aspects of the {@link FragmentExecutor} design that are
+ * not clear.
+ * <ol>
+ * <li>If we get a <b>Receiver Finished</b> message for one downstream receiver,
+ * will we eventually get one from every downstream receiver?</li>
+ * <li>What happens when we process a <b>Receiver Finished</b> message for some
+ * (but not all) downstream receivers and then we cancel the fragment?</li>
+ * <li>What happens when we process a <b>Receiver Finished</b> message for some
+ * (but not all) downstream receivers and then we run out of data from the
+ * upstream?</li>
+ * </ol>
  * </p>
  */
 public class FragmentExecutor implements Runnable {
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
+  private static final Logger logger = LoggerFactory.getLogger(FragmentExecutor.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(FragmentExecutor.class);
 
   private final String fragmentName;
@@ -542,7 +571,7 @@ public class FragmentExecutor implements Runnable {
    * This is especially important as fragments can take longer to start
    */
   private class FragmentEventProcessor extends EventProcessor<FragmentEvent> {
-    private AtomicBoolean terminate = new AtomicBoolean(false);
+    private final AtomicBoolean terminate = new AtomicBoolean(false);
 
     void cancel() {
       sendEvent(new FragmentEvent(EventType.CANCEL, null));
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index 6c7176b..2352d0a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -86,7 +86,7 @@ public class QueryBuilder {
     private QueryId queryId;
     private int recordCount;
     private int batchCount;
-    private long startTime;
+    private final long startTime;
 
     public SummaryOnlyQueryEventListener(QuerySummaryFuture future) {
       this.future = future;
@@ -132,7 +132,7 @@ public class QueryBuilder {
      * launched the query.
      */
 
-    private CountDownLatch lock = new CountDownLatch(1);
+    private final CountDownLatch lock = new CountDownLatch(1);
     private QuerySummary summary;
 
     /**
@@ -373,7 +373,7 @@ public class QueryBuilder {
 
     // Unload the batch and convert to a row set.
 
-    final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+    RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
     try {
       loader.load(resultBatch.getHeader().getDef(), resultBatch.getData());
       resultBatch.release();
@@ -760,18 +760,18 @@ public class QueryBuilder {
    */
   private String queryPlan(String columnName) throws Exception {
     Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain an SQL query.");
-    final List<QueryDataBatch> results = results();
-    final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
-    final StringBuilder builder = new StringBuilder();
+    List<QueryDataBatch> results = results();
+    RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
+    StringBuilder builder = new StringBuilder();
 
-    for (final QueryDataBatch b : results) {
+    for (QueryDataBatch b : results) {
       if (!b.hasData()) {
         continue;
       }
 
       loader.load(b.getHeader().getDef(), b.getData());
 
-      final VectorWrapper<?> vw;
+      VectorWrapper<?> vw;
       try {
           vw = loader.getValueAccessorById(
               NullableVarCharVector.class,
@@ -780,9 +780,9 @@ public class QueryBuilder {
         throw new IllegalStateException("Looks like you did not provide an explain plan query, please add EXPLAIN PLAN FOR to the beginning of your query.");
       }
 
-      final ValueVector vv = vw.getValueVector();
+      ValueVector vv = vw.getValueVector();
       for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
-        final Object o = vv.getAccessor().getObject(i);
+        Object o = vv.getAccessor().getObject(i);
         builder.append(o);
       }
       loader.clear();
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index e32ecc9..d92cb5d 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -299,6 +299,15 @@ public final class ${className} extends BaseDataValueVector implements <#if type
     int bitsLength = bitsField.getBufferLength();
     SerializedField valuesField = metadata.getChild(1);
     values.load(valuesField, buffer.slice(bitsLength, capacity - bitsLength));
+    <#if type.major == "VarLen">
+
+    // Though a loaded vector should be read only,
+    // it can have its values set such as when copying
+    // with transfer pairs. Since lastSet is used when
+    // setting values, it must be set on vector load.
+
+    mutator.lastSet = accessor.getValueCount() - 1;
+    </#if>
   }
 
   @Override