You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2019/06/07 10:38:50 UTC

[drill] 01/05: DRILL-7258: Remove field width limit for text reader

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 8a7007f03397849b555a74297a6a637293958cc5
Author: Paul Rogers <pa...@yahoo.com>
AuthorDate: Thu May 30 18:43:09 2019 -0700

    DRILL-7258: Remove field width limit for text reader
    
    The V2 text reader enforced a limit of 64K characters when using
    column headers, but not when using the columns[] array. The V3 reader
    enforced the 64K limit in both cases.
    
    This patch removes the limit in both cases. The limit now is the
    16MB vector size limit. With headers, no one column can exceed 16MB.
    With the columns[] array, no one row can exceed 16MB. (The 16MB
    limit is set by the Netty memory allocator.)
    
    Added an "appendBytes()" method to the scalar column writer which adds
    additional bytes to those already written for a specific column or
    array element value. The method is implemented for VarChar, Var16Char
     and VarBinary vectors. It throws an exception for all other types.
    
    When used with a type conversion shim, the appendBytes() method throws
    an exception. This should be OK because, the previous setBytes() should
    have failed because a huge value is not acceptable for numeric or date
    types conversions.
    
    Added unit tests of the append feature, and for the append feature in
    the batch overflow case (when appending bytes causes the vector or
    batch to overflow.) Also added tests to verify the lack of column width
    limit with the text reader, both with and without headers.
    
    closes #1802
---
 .../exec/physical/rowSet/impl/WriterIndexImpl.java |  3 +
 .../easy/text/compliant/v3/BaseFieldOutput.java    | 52 ++++++++++---
 .../easy/text/compliant/v3/FieldVarCharOutput.java | 12 +--
 .../text/compliant/v3/RepeatedVarCharOutput.java   |  7 +-
 .../store/easy/text/compliant/v3/TextInput.java    |  9 +--
 .../rowSet/impl/TestResultSetLoaderOverflow.java   | 76 +++++++++++++++++-
 .../store/easy/text/compliant/BaseCsvTest.java     | 20 +++++
 .../easy/text/compliant/TestCsvWithHeaders.java    | 25 ++++++
 .../easy/text/compliant/TestCsvWithoutHeaders.java | 29 +++++++
 .../apache/drill/test/rowSet/RowSetWriterImpl.java |  3 +
 .../drill/test/rowSet/test/PerformanceTool.java    |  5 +-
 .../test/rowSet/test/TestFixedWidthWriter.java     |  3 +
 .../test/rowSet/test/TestScalarAccessors.java      | 89 ++++++++++++++++++++++
 .../main/codegen/templates/ColumnAccessors.java    | 11 +++
 .../exec/vector/accessor/ColumnWriterIndex.java    | 13 +++-
 .../drill/exec/vector/accessor/ScalarWriter.java   |  1 +
 .../accessor/convert/AbstractWriteConverter.java   |  5 ++
 .../accessor/writer/AbstractArrayWriter.java       |  7 +-
 .../vector/accessor/writer/BaseScalarWriter.java   |  5 ++
 .../vector/accessor/writer/BaseVarWidthWriter.java |  7 ++
 .../exec/vector/accessor/writer/MapWriter.java     |  1 +
 .../accessor/writer/NullableScalarWriter.java      |  8 ++
 .../accessor/writer/OffsetVectorWriterImpl.java    |  6 ++
 .../vector/accessor/writer/ScalarArrayWriter.java  |  3 +
 .../accessor/writer/dummy/DummyScalarWriter.java   |  3 +
 25 files changed, 376 insertions(+), 27 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
index 9fb3e4e..6119791 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/impl/WriterIndexImpl.java
@@ -106,6 +106,9 @@ class WriterIndexImpl implements ColumnWriterIndex {
   public void nextElement() { }
 
   @Override
+  public void prevElement() { }
+
+  @Override
   public ColumnWriterIndex outerIndex() { return null; }
 
   @Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
index 6bf0bb6..5dd4284 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/BaseFieldOutput.java
@@ -17,13 +17,17 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant.v3;
 
-import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
 public abstract class BaseFieldOutput extends TextOutput {
 
-  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseFieldOutput.class);
-  private static final int MAX_FIELD_LENGTH = 1024 * 64;
+  /**
+   * Width of the per-field data buffer. Fields can be larger.
+   * In that case, subsequent buffers are appended to the vector
+   * to form the full field.
+   */
+  private static final int BUFFER_LEN = 1024;
 
   // track which field is getting appended
   protected int currentFieldIndex = -1;
@@ -31,6 +35,8 @@ public abstract class BaseFieldOutput extends TextOutput {
   protected int currentDataPointer;
   // track if field is still getting appended
   private boolean fieldOpen = true;
+  // number of bytes written to field thus far
+  protected int fieldWriteCount;
   // holds chars for a field
   protected byte[] fieldBytes;
   protected final RowSetLoader writer;
@@ -84,7 +90,7 @@ public abstract class BaseFieldOutput extends TextOutput {
     // If we project at least one field, allocate a buffer.
 
     if (maxField >= 0) {
-      fieldBytes = new byte[MAX_FIELD_LENGTH];
+      fieldBytes = new byte[BUFFER_LEN];
     }
   }
 
@@ -104,6 +110,7 @@ public abstract class BaseFieldOutput extends TextOutput {
     assert index == currentFieldIndex + 1;
     currentFieldIndex = index;
     currentDataPointer = 0;
+    fieldWriteCount = 0;
     fieldOpen = true;
 
     // Figure out if this field is projected.
@@ -122,18 +129,41 @@ public abstract class BaseFieldOutput extends TextOutput {
     if (! fieldProjected) {
       return;
     }
-    if (currentDataPointer >= MAX_FIELD_LENGTH - 1) {
-      throw UserException
-          .unsupportedError()
-          .message("Text column is too large.")
-          .addContext("Column", currentFieldIndex)
-          .addContext("Limit", MAX_FIELD_LENGTH)
-          .build(logger);
+    if (currentDataPointer >= BUFFER_LEN - 1) {
+      writeToVector();
     }
 
     fieldBytes[currentDataPointer++] = data;
   }
 
+
+  /**
+   * Write a buffer of data to the underlying vector using the
+   * column writer. The buffer holds a complete or partial chunk
+   * of data for the field. If this is the first data for the field,
+   * write the bytes. If this is a second buffer for the same field,
+   * append the bytes. The append will work if the underlying vector
+   * is VarChar, it will fail if a type conversion shim is in between.
+   * (This is generally OK because the previous setBytes should have
+   * failed because a large int or date is not supported.)
+   */
+
+  protected void writeToVector() {
+    if (!fieldProjected) {
+      return;
+    }
+    ScalarWriter colWriter = columnWriter();
+    if (fieldWriteCount == 0) {
+       colWriter.setBytes(fieldBytes, currentDataPointer);
+    } else {
+      colWriter.appendBytes(fieldBytes, currentDataPointer);
+    }
+    fieldWriteCount += currentDataPointer;
+    currentDataPointer = 0;
+  }
+
+  protected abstract ScalarWriter columnWriter();
+
   @Override
   public boolean endField() {
     fieldOpen = false;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
index df48a55..482c5cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/FieldVarCharOutput.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.easy.text.compliant.v3;
 
 import org.apache.drill.exec.physical.rowSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
 
 /**
  * Class is responsible for generating record batches for text file inputs. We generate
@@ -52,11 +53,12 @@ class FieldVarCharOutput extends BaseFieldOutput {
 
   @Override
   public boolean endField() {
-    if (fieldProjected) {
-      writer.scalar(currentFieldIndex)
-        .setBytes(fieldBytes, currentDataPointer);
-    }
-
+    writeToVector();
     return super.endField();
   }
+
+  @Override
+  protected ScalarWriter columnWriter() {
+    return writer.scalar(currentFieldIndex);
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
index 13b4450..f7f1035 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/RepeatedVarCharOutput.java
@@ -120,7 +120,7 @@ public class RepeatedVarCharOutput extends BaseFieldOutput {
 
       // Save the field.
 
-      columnWriter.setBytes(fieldBytes, currentDataPointer);
+      writeToVector();
     } else {
 
       // The field is not projected.
@@ -134,4 +134,9 @@ public class RepeatedVarCharOutput extends BaseFieldOutput {
 
     return super.endField();
   }
+
+  @Override
+  protected ScalarWriter columnWriter() {
+    return columnWriter;
+  }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
index 26fade6..951bc81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/compliant/v3/TextInput.java
@@ -17,22 +17,21 @@
  */
 package org.apache.drill.exec.store.easy.text.compliant.v3;
 
-import io.netty.buffer.DrillBuf;
-import io.netty.util.internal.PlatformDependent;
+import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
 
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-
-import static org.apache.drill.exec.memory.BoundsChecking.rangeCheck;
+import io.netty.buffer.DrillBuf;
+import io.netty.util.internal.PlatformDependent;
 
 /**
  * Class that fronts an InputStream to provide a byte consumption interface.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
index a82e3c3..3f0989d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/rowSet/impl/TestResultSetLoaderOverflow.java
@@ -37,12 +37,12 @@ import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 import org.apache.drill.test.SubOperatorTest;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSetReader;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
 
 /**
  * Exercise the vector overflow functionality for the result set loader.
@@ -706,4 +706,78 @@ public class TestResultSetLoaderOverflow extends SubOperatorTest {
 
     rsLoader.close();
   }
+
+  @Test
+  public void testVectorSizeLimitWithAppend() {
+    TupleMetadata schema = new SchemaBuilder()
+        .add("s", MinorType.VARCHAR)
+        .buildSchema();
+    ResultSetOptions options = new OptionBuilder()
+        .setRowCountLimit(ValueVector.MAX_ROW_COUNT)
+        .setSchema(schema)
+        .build();
+    ResultSetLoader rsLoader = new ResultSetLoaderImpl(fixture.allocator(), options);
+    RowSetLoader rootWriter = rsLoader.writer();
+
+    rsLoader.startBatch();
+    byte head[] = "abc".getBytes();
+    byte tail[] = new byte[523];
+    Arrays.fill(tail, (byte) 'X');
+    int count = 0;
+    ScalarWriter colWriter = rootWriter.scalar(0);
+    while (! rootWriter.isFull()) {
+      rootWriter.start();
+      colWriter.setBytes(head, head.length);
+      colWriter.appendBytes(tail, tail.length);
+      colWriter.appendBytes(tail, tail.length);
+      rootWriter.save();
+      count++;
+    }
+
+    // Number of rows should be driven by vector size.
+    // Our row count should include the overflow row
+
+    int valueLength = head.length + 2 * tail.length;
+    int expectedCount = ValueVector.MAX_BUFFER_SIZE / valueLength;
+    assertEquals(expectedCount + 1, count);
+
+    // Loader's row count should include only "visible" rows
+
+    assertEquals(expectedCount, rootWriter.rowCount());
+
+    // Total count should include invisible and look-ahead rows.
+
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+
+    // Result should exclude the overflow row
+
+    RowSet result = fixture.wrap(rsLoader.harvest());
+    assertEquals(expectedCount, result.rowCount());
+
+    // Verify that the values were, in fact, appended.
+
+    String expected = new String(head, Charsets.UTF_8);
+    expected += new String(tail, Charsets.UTF_8);
+    expected += new String(tail, Charsets.UTF_8);
+    RowSetReader reader = result.reader();
+    while (reader.next()) {
+      assertEquals(expected, reader.scalar(0).getString());
+    }
+    result.clear();
+
+    // Next batch should start with the overflow row
+
+    rsLoader.startBatch();
+    assertEquals(1, rootWriter.rowCount());
+    assertEquals(expectedCount + 1, rsLoader.totalRowCount());
+    result = fixture.wrap(rsLoader.harvest());
+    assertEquals(1, result.rowCount());
+    reader = result.reader();
+    while (reader.next()) {
+      assertEquals(expected, reader.scalar(0).getString());
+    }
+    result.clear();
+
+    rsLoader.close();
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index c2aeac6..2819aa8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -29,6 +29,8 @@ import org.apache.drill.test.ClusterTest;
 
 public class BaseCsvTest extends ClusterTest {
 
+  protected final int BIG_COL_SIZE = 70_000;
+
   protected static final String PART_DIR = "root";
   protected static final String NESTED_DIR = "nested";
   protected static final String ROOT_FILE = "first.csv";
@@ -118,4 +120,22 @@ public class BaseCsvTest extends ClusterTest {
       }
     }
   }
+  protected String buildBigColFile(boolean withHeader) throws IOException {
+    String fileName = "hugeCol.csv";
+    try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
+      if (withHeader) {
+        out.println("id,big,n");
+      }
+      for (int i = 0; i < 10; i++) {
+        out.print(i + 1);
+        out.print(",");
+        for (int j = 0; j < BIG_COL_SIZE; j++) {
+          out.print((char) ((j + i) % 26 + 'A'));
+        }
+        out.print(",");
+        out.println((i + 1) * 10);
+      }
+    }
+    return fileName;
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index 784c4be..645af30 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -981,4 +981,29 @@ public class TestCsvWithHeaders extends BaseCsvTest {
       resetV3();
     }
   }
+
+  @Test
+  public void testHugeColumn() throws IOException {
+    String fileName = buildBigColFile(true);
+    try {
+      enableV3(true);
+      String sql = "SELECT * FROM `dfs.data`.`%s`";
+      RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+      assertEquals(10, actual.rowCount());
+      RowSetReader reader = actual.reader();
+      while (reader.next()) {
+        int i = reader.logicalIndex();
+        assertEquals(Integer.toString(i + 1), reader.scalar(0).getString());
+        String big = reader.scalar(1).getString();
+        assertEquals(BIG_COL_SIZE, big.length());
+        for (int j = 0; j < BIG_COL_SIZE; j++) {
+          assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
+        }
+        assertEquals(Integer.toString((i + 1) * 10), reader.scalar(2).getString());
+      }
+      actual.clear();
+    } finally {
+      resetV3();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
index ec6810d..2d68a01 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithoutHeaders.java
@@ -442,4 +442,33 @@ public class TestCsvWithoutHeaders extends BaseCsvTest {
       resetV3();
     }
   }
+
+  @Test
+  public void testHugeColumn() throws IOException {
+    String fileName = buildBigColFile(false);
+    try {
+      enableV3(true);
+      String sql = "SELECT * FROM `dfs.data`.`%s`";
+      RowSet actual = client.queryBuilder().sql(sql, fileName).rowSet();
+      assertEquals(10, actual.rowCount());
+      RowSetReader reader = actual.reader();
+      ArrayReader arrayReader = reader.array(0);
+      while (reader.next()) {
+        int i = reader.logicalIndex();
+        arrayReader.next();
+        assertEquals(Integer.toString(i + 1), arrayReader.scalar().getString());
+        arrayReader.next();
+        String big = arrayReader.scalar().getString();
+        assertEquals(BIG_COL_SIZE, big.length());
+        for (int j = 0; j < BIG_COL_SIZE; j++) {
+          assertEquals((char) ((j + i) % 26 + 'A'), big.charAt(j));
+        }
+        arrayReader.next();
+        assertEquals(Integer.toString((i + 1) * 10), arrayReader.scalar().getString());
+      }
+      actual.clear();
+    } finally {
+      resetV3();
+    }
+  }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
index 8ba1f93..6512d62 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/RowSetWriterImpl.java
@@ -81,6 +81,9 @@ public class RowSetWriterImpl extends AbstractTupleWriter implements RowSetWrite
     public final void nextElement() { }
 
     @Override
+    public final void prevElement() { }
+
+    @Override
     public void rollover() {
       throw new UnsupportedOperationException("Rollover not supported in the row set writer.");
     }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index fa92c09..c810f93 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -34,8 +34,8 @@ import org.apache.drill.exec.vector.accessor.ColumnWriterIndex;
 import org.apache.drill.exec.vector.accessor.writer.AbstractArrayWriter.ArrayObjectWriter;
 import org.apache.drill.exec.vector.accessor.writer.NullableScalarWriter;
 import org.apache.drill.exec.vector.accessor.writer.ScalarArrayWriter;
-import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.test.OperatorFixture;
 
 /**
  * Tests the performance of the writers compared to using the value
@@ -180,6 +180,9 @@ public class PerformanceTool {
     public final void nextElement() { index++; }
 
     @Override
+    public final void prevElement() { }
+
+    @Override
     public void rollover() { }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java
index 3eba578..f7304e9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestFixedWidthWriter.java
@@ -56,6 +56,9 @@ public class TestFixedWidthWriter extends SubOperatorTest {
     public void nextElement() { }
 
     @Override
+    public void prevElement() { }
+
+    @Override
     public void rollover() { }
 
     @Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java
index 582c2f4..cb11af0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestScalarAccessors.java
@@ -18,6 +18,7 @@
 package org.apache.drill.test.rowSet.test;
 
 import static org.apache.drill.test.rowSet.RowSetUtilities.dec;
+import static org.apache.drill.test.rowSet.RowSetUtilities.strArray;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -39,14 +40,19 @@ import org.apache.drill.exec.vector.DateUtilities;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.accessor.ArrayReader;
+import org.apache.drill.exec.vector.accessor.ArrayWriter;
 import org.apache.drill.exec.vector.accessor.ScalarReader;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.exec.vector.accessor.ValueType;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import org.apache.drill.test.SubOperatorTest;
+import org.apache.drill.test.rowSet.DirectRowSet;
+import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.SingleRowSet;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.apache.drill.test.rowSet.RowSetWriter;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Instant;
 import org.joda.time.LocalDate;
@@ -1774,4 +1780,87 @@ public class TestScalarAccessors extends SubOperatorTest {
     }
     rsb.build().clear();
   }
+
+  /**
+   * Test the ability to append bytes to a VarChar column. Should work for
+   * Var16Char, but that type is not yet supported in Drill.
+   */
+
+  @Test
+  public void testAppend() {
+    doTestAppend(new SchemaBuilder()
+        .add("col", MinorType.VARCHAR)
+        .buildSchema());
+    doTestAppend(new SchemaBuilder()
+        .addNullable("col", MinorType.VARCHAR)
+        .buildSchema());
+  }
+
+  private void doTestAppend(TupleMetadata schema) {
+    DirectRowSet rs = DirectRowSet.fromSchema(fixture.allocator(), schema);
+    RowSetWriter writer = rs.writer(100);
+    ScalarWriter colWriter = writer.scalar("col");
+
+    byte first[] = "abc".getBytes();
+    byte second[] = "12345".getBytes();
+    colWriter.setBytes(first, first.length);
+    colWriter.appendBytes(second, second.length);
+    writer.save();
+    colWriter.setBytes(second, second.length);
+    colWriter.appendBytes(first, first.length);
+    writer.save();
+    colWriter.setBytes(first, first.length);
+    colWriter.appendBytes(second, second.length);
+    writer.save();
+    RowSet actual = writer.done();
+
+    RowSet expected = new RowSetBuilder(fixture.allocator(), schema)
+        .addSingleCol("abc12345")
+        .addSingleCol("12345abc")
+        .addSingleCol("abc12345")
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+  }
+
+  /**
+   * Test the ability to append bytes to a VarChar column. Should work for
+   * Var16Char, but that type is not yet supported in Drill.
+   */
+
+  @Test
+  public void testAppendWithArray() {
+    TupleMetadata schema = new SchemaBuilder()
+        .addArray("col", MinorType.VARCHAR)
+        .buildSchema();
+
+    DirectRowSet rs = DirectRowSet.fromSchema(fixture.allocator(), schema);
+    RowSetWriter writer = rs.writer(100);
+    ArrayWriter arrayWriter = writer.array("col");
+    ScalarWriter colWriter = arrayWriter.scalar();
+
+    byte first[] = "abc".getBytes();
+    byte second[] = "12345".getBytes();
+    for (int i = 0; i < 3; i++) {
+      colWriter.setBytes(first, first.length);
+      colWriter.appendBytes(second, second.length);
+      arrayWriter.save();
+      colWriter.setBytes(second, second.length);
+      colWriter.appendBytes(first, first.length);
+      arrayWriter.save();
+      colWriter.setBytes(first, first.length);
+      colWriter.appendBytes(second, second.length);
+      arrayWriter.save();
+      writer.save();
+    }
+    RowSet actual = writer.done();
+
+    RowSet expected = new RowSetBuilder(fixture.allocator(), schema)
+        .addSingleCol(strArray("abc12345", "12345abc", "abc12345"))
+        .addSingleCol(strArray("abc12345", "12345abc", "abc12345"))
+        .addSingleCol(strArray("abc12345", "12345abc", "abc12345"))
+        .build();
+
+    RowSetUtilities.verify(expected, actual);
+  }
 }
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 0891e13..79f352a 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -415,6 +415,17 @@ public class ColumnAccessors {
       buf.writerIndex(VALUE_WIDTH);
     }
     </#if>
+    
+    <#if drillType == "VarChar" || drillType == "Var16Char" || drillType == "VarBinary">
+    @Override
+    public final void appendBytes(final byte[] value, final int len) {
+      vectorIndex.prevElement();
+      final int offset = prepareAppend(len);
+      drillBuf.setBytes(offset, value, 0, len);
+      offsetsWriter.reviseOffset(offset + len);
+      vectorIndex.nextElement();
+    }
+    </#if>
     <#if drillType == "VarChar">
 
     @Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java
index 7e225c9..cdeb0df 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ColumnWriterIndex.java
@@ -48,14 +48,23 @@ public interface ColumnWriterIndex {
   int vectorIndex();
 
   /**
-   * Index for array elements that allows the caller to increment the
-   * index. For arrays, writing (or saving) one value automatically
+   * Increment the index for an array.
+   * For arrays, writing (or saving) one value automatically
    * moves to the next value. Ignored for non-element indexes.
    */
 
   void nextElement();
 
   /**
+   * Decrement the index for an array. Used exclusively for
+   * appending bytes to a VarChar, Var16Char or VarBytes
+   * column. Assumed to be followed by another call
+   * to nextElement().
+   */
+
+  void prevElement();
+
+  /**
    * When handling overflow, the index must be reset so that the current row
    * starts at the start of the vector. Relative offsets must be preserved.
    * (That is, if the current write position for an array is four greater than
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
index 55a645e..44a4847 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/ScalarWriter.java
@@ -70,6 +70,7 @@ public interface ScalarWriter extends ColumnWriter {
   void setDouble(double value);
   void setString(String value);
   void setBytes(byte[] value, int len);
+  void appendBytes(byte[] value, int len);
   void setDecimal(BigDecimal value);
   void setPeriod(Period value);
   void setDate(LocalDate value);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
index b98e8e0..f92eed6 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/convert/AbstractWriteConverter.java
@@ -110,6 +110,11 @@ public abstract class AbstractWriteConverter extends AbstractScalarWriter {
   }
 
   @Override
+  public void appendBytes(byte[] value, int len) {
+    throw conversionError("bytes");
+  }
+
+  @Override
   public void setDecimal(BigDecimal value) {
     baseWriter.setDecimal(value);
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
index 1b42169..b8ec266 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractArrayWriter.java
@@ -144,7 +144,12 @@ public abstract class AbstractArrayWriter implements ArrayWriter, WriterEvents {
     @Override
     public void nextElement() { }
 
-    public void next() { elementIndex++; }
+    @Override
+    public void prevElement() { }
+
+    protected void next() { elementIndex++; }
+
+    protected void prev() { elementIndex--; }
 
     public int valueStartOffset() { return offsetsWriter.nextOffset(); }
 
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
index 0083ece..8dc85cf 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseScalarWriter.java
@@ -264,6 +264,11 @@ public abstract class BaseScalarWriter extends AbstractScalarWriterImpl {
   }
 
   @Override
+  public void appendBytes(byte[] value, int len) {
+    throw conversionError("bytes");
+  }
+
+  @Override
   public void setDecimal(BigDecimal value) {
     throw conversionError("Decimal");
   }
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
index 70de95a..0bac916 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/BaseVarWidthWriter.java
@@ -85,6 +85,13 @@ public abstract class BaseVarWidthWriter extends BaseScalarWriter {
     return offsetsWriter.nextOffset;
   }
 
+  protected final int prepareAppend(final int width) {
+    // No fill empties needed: must have been done
+    // on previous setBytes() call.
+
+    return writeOffset(width);
+  }
+
   @Override
   protected final void setBuffer() {
     drillBuf = vector().getBuffer();
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
index 82e90e9..fd6e7c4 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/MapWriter.java
@@ -51,6 +51,7 @@ public abstract class MapWriter extends AbstractTupleWriter {
     @Override public int rowStartIndex() { return baseIndex.rowStartIndex(); }
     @Override public int vectorIndex() { return baseIndex.vectorIndex(); }
     @Override public void nextElement() { }
+    @Override public void prevElement() { }
     @Override public void rollover() { }
 
     @Override public ColumnWriterIndex outerIndex() {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index be3a3e4..856a44b 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -60,6 +60,9 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl {
     }
 
     @Override
+    public void prevElement() { }
+
+    @Override
     public void rollover() {
       parentIndex.rollover();
     }
@@ -180,6 +183,11 @@ public class NullableScalarWriter extends AbstractScalarWriterImpl {
   }
 
   @Override
+  public void appendBytes(byte[] value, int len) {
+    baseWriter.appendBytes(value, len);
+  }
+
+  @Override
   public void setDecimal(BigDecimal value) {
     baseWriter.setDecimal(value);
     isSetWriter.setInt(1);
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
index f4ee0ab..1da362a 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriterImpl.java
@@ -253,6 +253,12 @@ public class OffsetVectorWriterImpl extends AbstractFixedWidthWriter implements
     nextOffset = newOffset;
   }
 
+  public final void reviseOffset(final int newOffset) {
+    final int writeIndex = vectorIndex.vectorIndex() + 1;
+    drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
+    nextOffset = newOffset;
+  }
+
   public final void fillOffset(final int newOffset) {
     drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, newOffset);
     nextOffset = newOffset;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
index 8bacdf4..4df8721 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/ScalarArrayWriter.java
@@ -60,6 +60,9 @@ public class ScalarArrayWriter extends BaseArrayWriter {
 
     @Override
     public final void nextElement() { next(); }
+
+    @Override
+    public final void prevElement() { prev(); }
   }
 
   private final ScalarWriter elementWriter;
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
index 852bd0d..2d52c3e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/dummy/DummyScalarWriter.java
@@ -72,6 +72,9 @@ public class DummyScalarWriter extends AbstractScalarWriterImpl {
   public void setBytes(byte[] value, int len) { }
 
   @Override
+  public void appendBytes(byte[] value, int len) { }
+
+  @Override
   public void setDecimal(BigDecimal value) { }
 
   @Override