You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/01/31 03:56:37 UTC

[1/2] drill git commit: DRILL-6080: Sort incorrectly limits batch size to 65535 records

Repository: drill
Updated Branches:
  refs/heads/master cee67de2e -> ee9e613d7


DRILL-6080: Sort incorrectly limits batch size to 65535 records

closes #1090

* Sort incorrectly limits batch size to 65535 records rather than 65536.
* This PR also includes a few code cleanup items.
* Fix for overflow in offset vector in row set writer
* Performance tool update
* Replace "unsafe" methods with "set" methods
* Also fixes an indexing issue with nullable writers
* Removed debug & timing code
* Increase strictness for batch size


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f0d00c62
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f0d00c62
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f0d00c62

Branch: refs/heads/master
Commit: f0d00c62b594e424ea085ebd0a5be26f0f509fda
Parents: 039530a
Author: Paul Rogers <pr...@maprtech.com>
Authored: Wed Jan 10 16:04:53 2018 -0800
Committer: Boaz Ben-Zvi <bo...@apache.org>
Committed: Tue Jan 30 19:54:57 2018 -0800

----------------------------------------------------------------------
 .../impl/sort/SortRecordBatchBuilder.java       |  6 +-
 .../exec/physical/impl/xsort/MSortTemplate.java |  9 +-
 .../exec/record/selection/SelectionVector4.java |  4 +-
 .../impl/xsort/managed/TestSortImpl.java        | 72 ++++++++--------
 .../drill/test/rowSet/test/PerformanceTool.java | 10 +++
 .../rowSet/test/TestVariableWidthWriter.java    |  2 +-
 .../src/main/java/io/netty/buffer/DrillBuf.java | 86 --------------------
 .../main/codegen/templates/ColumnAccessors.java | 24 +++---
 .../writer/AbstractFixedWidthWriter.java        |  2 +-
 .../accessor/writer/NullableScalarWriter.java   | 51 +++++++++++-
 .../accessor/writer/OffsetVectorWriter.java     | 36 +++++---
 11 files changed, 145 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 6c66c01..d995902 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -105,7 +105,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
       return;
     }
 
-    if(runningBatches >= Character.MAX_VALUE) {
+    if (runningBatches >= Character.MAX_VALUE) {
       final String errMsg = String.format("Tried to add more than %d number of batches.", (int) Character.MAX_VALUE);
       logger.error(errMsg);
       throw new DrillRuntimeException(errMsg);
@@ -152,7 +152,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
     if (svBuffer == null) {
       throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
     }
-    sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
+    sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
     BatchSchema schema = batches.keySet().iterator().next();
     List<RecordBatchData> data = batches.get(schema);
 
@@ -174,7 +174,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
       int recordBatchId = 0;
       for (RecordBatchData d : data) {
         for (int i = 0; i < d.getRecordCount(); i++, index++) {
-          sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+          sv4.set(index, recordBatchId, d.getSv2().getIndex(i));
         }
         // might as well drop the selection vector since we'll stop using it now.
         d.getSv2().clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9b69170..afbc58b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -32,10 +32,10 @@ import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.util.IndexedSortable;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
 import com.google.common.collect.Queues;
 
 public abstract class MSortTemplate implements MSorter, IndexedSortable {
@@ -43,7 +43,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
 
   private SelectionVector4 vector4;
   private SelectionVector4 aux;
-  private long compares;
   private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
   private FragmentContext context;
 
@@ -74,13 +73,14 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
         throw new UnsupportedOperationException(String.format("Missing batch. batch: %d newBatch: %d", batch, newBatch));
       }
     }
+    @SuppressWarnings("resource")
     final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
 
     try {
       desiredRecordBatchCount = context.getConfig().getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE);
     } catch(ConfigException.Missing e) {
       // value not found, use default value instead
-      desiredRecordBatchCount = Character.MAX_VALUE;
+      desiredRecordBatchCount = ValueVector.MAX_ROW_COUNT;
     }
     aux = new SelectionVector4(drillBuf, totalCount, desiredRecordBatchCount);
   }
@@ -126,7 +126,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
 
   @Override
   public void sort(final VectorContainer container) {
-    final Stopwatch watch = Stopwatch.createStarted();
     while (runStarts.size() > 1) {
 
       // check if we're cancelled/failed frequently
@@ -153,6 +152,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
       if (outIndex < vector4.getTotalCount()) {
         copyRun(outIndex, vector4.getTotalCount());
       }
+      @SuppressWarnings("resource")
       final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount);
       aux.clear();
       aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount);
@@ -181,7 +181,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
   public int compare(final int leftIndex, final int rightIndex) {
     final int sv1 = vector4.get(leftIndex);
     final int sv2 = vector4.get(rightIndex);
-    compares++;
     try {
       return doEval(sv1, sv2);
     } catch (SchemaChangeException e) {

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index b51fdca..2c10d6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -112,8 +112,8 @@ public class SelectionVector4 implements AutoCloseable {
       return false;
     }
 
-    start = start+length;
-    int newEnd = Math.min(start+length, recordCount);
+    start = start + length;
+    int newEnd = Math.min(start + length, recordCount);
     length = newEnd - start;
 //    logger.debug("New start {}, new length {}", start, length);
     return true;

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index cdca30e..a985478 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.categories.OperatorTest;
 import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.DirectRowSet;
@@ -45,16 +46,18 @@ import org.apache.drill.test.rowSet.HyperRowSetImpl;
 import org.apache.drill.test.rowSet.IndirectRowSet;
 import org.apache.drill.test.rowSet.RowSet;
 import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSetReader;
-import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.RowSetBuilder;
 import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import com.google.common.base.Stopwatch;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
-import org.junit.experimental.categories.Category;
+
+import io.netty.buffer.DrillBuf;
 
 /**
  * Tests the external sort implementation: the "guts" of the sort stripped of the
@@ -157,12 +160,6 @@ public class TestSortImpl extends DrillTest {
       for (RowSet expectedSet : expected) {
         assertTrue(results.next());
         RowSet rowSet = toRowSet(results, dest);
-        // Uncomment these for debugging. Leave them commented otherwise
-        // to avoid polluting the Maven build output unnecessarily.
-//        System.out.println("Expected:");
-//        expectedSet.print();
-//        System.out.println("Actual:");
-//        rowSet.print();
         new RowSetComparison(expectedSet)
               .verify(rowSet);
         expectedSet.clear();
@@ -325,7 +322,8 @@ public class TestSortImpl extends DrillTest {
     public DataGenerator(OperatorFixture fixture, int targetCount, int batchSize, int seed, int step) {
       this.fixture = fixture;
       this.targetCount = targetCount;
-      this.batchSize = Math.min(batchSize, Character.MAX_VALUE);
+      Preconditions.checkArgument(batchSize > 0 && batchSize <= ValueVector.MAX_ROW_COUNT);
+      this.batchSize = batchSize;
       this.step = step;
       schema = SortTestUtilities.nonNullSchema();
       currentValue = seed;
@@ -380,7 +378,8 @@ public class TestSortImpl extends DrillTest {
 
     public DataValidator(int targetCount, int batchSize) {
       this.targetCount = targetCount;
-      this.batchSize = Math.min(batchSize, Character.MAX_VALUE);
+      Preconditions.checkArgument(batchSize > 0 && batchSize <= ValueVector.MAX_ROW_COUNT);
+      this.batchSize = batchSize;
     }
 
     public void validate(RowSet output) {
@@ -400,8 +399,6 @@ public class TestSortImpl extends DrillTest {
     }
   }
 
-  Stopwatch timer = Stopwatch.createUnstarted();
-
   /**
    * Run a full-blown sort test with multiple input batches. Because we want to
    * generate multiple inputs, we don't create them statically. Instead, we generate
@@ -428,33 +425,25 @@ public class TestSortImpl extends DrillTest {
       if (batchCount == 1) {
         // Simulates a NEW_SCHEMA event
 
-        timer.start();
         sort.setSchema(input.container().getSchema());
-        timer.stop();
       }
 
       // Simulates an OK event
 
-      timer.start();
       sort.addBatch(input.vectorAccessible());
-      timer.stop();
     }
 
     // Simulate returning results
 
-    timer.start();
     SortResults results = sort.startMerge();
     if (results.getContainer() != dest) {
       dest.clear();
       dest = results.getContainer();
     }
     while (results.next()) {
-      timer.stop();
       RowSet output = toRowSet(results, dest);
       validator.validate(output);
-      timer.start();
     }
-    timer.stop();
     validator.validateDone();
     results.close();
     dest.clear();
@@ -469,11 +458,9 @@ public class TestSortImpl extends DrillTest {
    */
 
   public void runJumboBatchTest(OperatorFixture fixture, int rowCount) {
-    timer.reset();
-    DataGenerator dataGen = new DataGenerator(fixture, rowCount, Character.MAX_VALUE);
-    DataValidator validator = new DataValidator(rowCount, Character.MAX_VALUE);
+    DataGenerator dataGen = new DataGenerator(fixture, rowCount, ValueVector.MAX_ROW_COUNT);
+    DataValidator validator = new DataValidator(rowCount, ValueVector.MAX_ROW_COUNT);
     runLargeSortTest(fixture, dataGen, validator);
-    System.out.println(timer.elapsed(TimeUnit.MILLISECONDS));
   }
 
   /**
@@ -499,7 +486,30 @@ public class TestSortImpl extends DrillTest {
   @Test
   public void testLargeBatch() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
-      runJumboBatchTest(fixture, Character.MAX_VALUE);
+//      partyOnMemory(fixture.allocator());
+      runJumboBatchTest(fixture, ValueVector.MAX_ROW_COUNT);
+    }
+  }
+
+  /**
+   * Use this function to pre-load Netty's free list with a large
+   * number of "dirty" blocks. This will often catch error due to
+   * failure to initialize value vector memory.
+   *
+   * @param fixture the operator fixture that provides an allocator
+   */
+
+  @SuppressWarnings("unused")
+  private void partyOnMemory(BufferAllocator allocator) {
+    DrillBuf bufs[] = new DrillBuf[10];
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i] = allocator.buffer(ValueVector.MAX_BUFFER_SIZE);
+      for (int j = 0; j < ValueVector.MAX_BUFFER_SIZE; j += 4) {
+        bufs[i].setInt(j, 0xDEADBEEF);
+      }
+    }
+    for (int i = 0; i < bufs.length; i++) {
+      bufs[i].release();
     }
   }
 
@@ -533,8 +543,6 @@ public class TestSortImpl extends DrillTest {
 
     VectorContainer dest = new VectorContainer();
     SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
-    timer.reset();
-    timer.start();
     sort.setSchema(rowSet.container().getSchema());
     sort.addBatch(rowSet.vectorAccessible());
     SortResults results = sort.startMerge();
@@ -543,13 +551,11 @@ public class TestSortImpl extends DrillTest {
       dest = results.getContainer();
     }
     assertTrue(results.next());
-    timer.stop();
     assertFalse(results.next());
     results.close();
     dest.clear();
     sort.close();
     sort.opContext().close();
-    System.out.println(timer.elapsed(TimeUnit.MILLISECONDS));
   }
 
   /**
@@ -561,7 +567,7 @@ public class TestSortImpl extends DrillTest {
   @Test
   public void testWideRows() throws Exception {
     try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
-      runWideRowsTest(fixture, 1000, Character.MAX_VALUE);
+      runWideRowsTest(fixture, 1000, ValueVector.MAX_ROW_COUNT);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index e84f2d3..ca282a1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -47,6 +47,16 @@ import com.google.common.base.Stopwatch;
  * <p>
  * Current results:
  * <ul>
+ * <li>Required and nullable writers are slightly faster than the
+ * corresponding vector mutator methods.</li>
+ * <li>Writer is 230% faster than a repeated mutator.</li>
+ * </ul>
+ *
+ * The key reason for the converged performance (now compared to earlier
+ * results below) is that both paths now use bounds-checking optimizations.
+ * <p>
+ * Prior results before the common bounds-check optimizations:
+ * <ul>
  * <li>Writer is 42% faster than a required mutator.</li>
  * <li>Writer is 73% faster than a nullable mutator.</li>
  * <li>Writer is 407% faster than a repeated mutator.</li>

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
index 103b212..912f362 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
@@ -373,7 +373,7 @@ public class TestVariableWidthWriter extends SubOperatorTest {
 
         @Override
         public boolean canExpand(ScalarWriter writer, int delta) {
-          System.out.println("Delta: " + delta);
+//          System.out.println("Delta: " + delta);
           totalAlloc += delta;
           return totalAlloc < 1024 * 1024;
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 115d31e..109500a 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -712,25 +712,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
     return this;
   }
 
-  // Clone of UDLE's setBytes(), but with bounds checking done as a boolean,
-  // not assertion.
-
-  public boolean setBytesBounded(int index, byte[] src, int srcIndex, int length) {
-    // Must do here because Drill's UDLE is not ref counted.
-    // Done as an assert to avoid production overhead: if this is going
-    // to fail, it will do so spectacularly in tests, due to a programming error.
-    assert refCnt() > 0;
-    return udle.setBytesBounded(index, src, srcIndex, length);
-  }
-
-  // As above, but for direct memory.
-
-  public boolean setBytesBounded(int index, DrillBuf src, int srcIndex, int length) {
-    // See above.
-    assert refCnt() > 0;
-    return udle.setBytesBounded(index, src.udle, srcIndex, length);
-  }
-
   @Override
   public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
     udle.setBytes(index + offset, src, srcIndex, length);
@@ -842,71 +823,4 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
       historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
     }
   }
-
-  // The "unsafe" methods are for use ONLY by code that does its own
-  // bounds checking. They are called "unsafe" for a reason: they will crash
-  // the JVM if values are addressed out of bounds.
-
-  /**
-   * Write an integer to the buffer at the given byte index, without
-   * bounds checks.
-   *
-   * @param offset byte (not int) offset of the location to write
-   * @param value the value to write
-   */
-
-  public void unsafePutInt(int offset, int value) {
-    PlatformDependent.putInt(addr + offset, value);
-  }
-
-  /**
-   * Write a long to the buffer at the given byte index, without
-   * bounds checks.
-   *
-   * @param index byte (not long) offset of the location to write
-   * @param value the value to write
-   */
-
-  public void unsafePutLong(int index, long value) {
-    PlatformDependent.putLong(addr + index, value);
-  }
-
-  /**
-   * Write a short to the buffer at the given byte index, without
-   * bounds checks.
-   *
-   * @param offset byte (not short) offset of the location to write
-   * @param value the value to write
-   */
-
-  public void unsafePutShort(int offset, short value) {
-    PlatformDependent.putShort(addr + offset, value);
-  }
-
-  /**
-   * Write a byte to the buffer at the given byte index, without
-   * bounds checks.
-   *
-   * @param offset byte offset of the location to write
-   * @param value the value to write
-   */
-
-  public void unsafePutByte(int offset, byte value) {
-    PlatformDependent.putByte(addr + offset, value);
-  }
-
-  /**
-   * Copy a buffer of heap data to the buffer memory.
-   *
-   * @param srce source byte buffer
-   * @param srcOffset offset within the byte buffer of the start of data
-   * @param destOffset byte offset into this buffer to which to write the
-   * data
-   * @param length length of the data, which must be within the
-   * bounds of this buffer
-   */
-
-  public void unsafeCopyMemory(byte[] srce, int srcOffset, int destOffset, int length) {
-    PlatformDependent.copyMemory(srce, srcOffset, addr + destOffset, length);
-  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/codegen/templates/ColumnAccessors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 33b12be..14ec1e8 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -276,14 +276,14 @@ public class ColumnAccessors {
       <#assign putAddr = "writeIndex * VALUE_WIDTH">
       </#if>
       <#if varWidth>
-      drillBuf.unsafeCopyMemory(value, 0, offset, len);
+      drillBuf.setBytes(offset, value, 0, len);
       offsetsWriter.setNextOffset(offset + len);
       <#elseif drillType == "Decimal9">
-      drillBuf.unsafePutInt(${putAddr},
+      drillBuf.setInt(${putAddr},
           DecimalUtility.getDecimal9FromBigDecimal(value,
                 type.getScale(), type.getPrecision()));
       <#elseif drillType == "Decimal18">
-      drillBuf.unsafePutLong(${putAddr},
+      drillBuf.setLong(${putAddr},
           DecimalUtility.getDecimal18FromBigDecimal(value,
                 type.getScale(), type.getPrecision()));
       <#elseif drillType == "Decimal38Sparse">
@@ -295,23 +295,23 @@ public class ColumnAccessors {
       DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH,
                type.getScale(), type.getPrecision(), 5);
       <#elseif drillType == "IntervalYear">
-      drillBuf.unsafePutInt(${putAddr},
+      drillBuf.setInt(${putAddr},
                 value.getYears() * 12 + value.getMonths());
       <#elseif drillType == "IntervalDay">
       final int offset = ${putAddr};
-      drillBuf.unsafePutInt(offset,     value.getDays());
-      drillBuf.unsafePutInt(offset + 4, periodToMillis(value));
+      drillBuf.setInt(offset,     value.getDays());
+      drillBuf.setInt(offset + 4, periodToMillis(value));
       <#elseif drillType == "Interval">
       final int offset = ${putAddr};
-      drillBuf.unsafePutInt(offset,     value.getYears() * 12 + value.getMonths());
-      drillBuf.unsafePutInt(offset + 4, value.getDays());
-      drillBuf.unsafePutInt(offset + 8, periodToMillis(value));
+      drillBuf.setInt(offset,     value.getYears() * 12 + value.getMonths());
+      drillBuf.setInt(offset + 4, value.getDays());
+      drillBuf.setInt(offset + 8, periodToMillis(value));
       <#elseif drillType == "Float4">
-      drillBuf.unsafePutInt(${putAddr}, Float.floatToRawIntBits((float) value));
+      drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value));
       <#elseif drillType == "Float8">
-      drillBuf.unsafePutLong(${putAddr}, Double.doubleToRawLongBits(value));
+      drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value));
       <#else>
-      drillBuf.unsafePut${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
+      drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
       </#if>
       vectorIndex.nextElement();
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
index e49f92c..1107216 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
@@ -98,7 +98,7 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter {
       while (dest < writeIndex) {
         int length = writeIndex - dest;
         length = Math.min(length, stride);
-        drillBuf.unsafeCopyMemory(ZERO_BUF, 0, dest * width, length * width);
+        drillBuf.setBytes(dest * width, ZERO_BUF, 0, length * width);
         dest += length;
       }
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index 6da2b50..2068304 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -30,8 +30,45 @@ import org.joda.time.Period;
 
 public class NullableScalarWriter extends AbstractScalarWriter {
 
+  public static final class ChildIndex implements ColumnWriterIndex {
+
+    private final ColumnWriterIndex parentIndex;
+
+    public ChildIndex(ColumnWriterIndex parentIndex) {
+      this.parentIndex = parentIndex;
+    }
+
+    @Override
+    public int rowStartIndex() {
+      return parentIndex.rowStartIndex();
+    }
+
+    @Override
+    public int vectorIndex() {
+      return parentIndex.vectorIndex();
+    }
+
+    @Override
+    public void nextElement() {
+      // Ignore next element requests from children.
+      // Nullable writers have two children, we don't want
+      // to increment the index twice.
+    }
+
+    @Override
+    public void rollover() {
+      parentIndex.rollover();
+    }
+
+    @Override
+    public ColumnWriterIndex outerIndex() {
+      return parentIndex.outerIndex();
+    }
+  }
+
   private final UInt1ColumnWriter isSetWriter;
   private final BaseScalarWriter baseWriter;
+  private ColumnWriterIndex writerIndex;
 
   public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) {
     isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector());
@@ -54,8 +91,10 @@ public class NullableScalarWriter extends AbstractScalarWriter {
 
   @Override
   public void bindIndex(ColumnWriterIndex index) {
-    isSetWriter.bindIndex(index);
-    baseWriter.bindIndex(index);
+    writerIndex = index;
+    ColumnWriterIndex childIndex = new ChildIndex(index);
+    isSetWriter.bindIndex(childIndex);
+    baseWriter.bindIndex(childIndex);
   }
 
   @Override
@@ -76,24 +115,28 @@ public class NullableScalarWriter extends AbstractScalarWriter {
   public void setNull() {
     isSetWriter.setInt(0);
     baseWriter.skipNulls();
+    writerIndex.nextElement();
   }
 
   @Override
   public void setInt(int value) {
     baseWriter.setInt(value);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override
   public void setLong(long value) {
     baseWriter.setLong(value);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override
   public void setDouble(double value) {
     baseWriter.setDouble(value);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override
@@ -105,24 +148,28 @@ public class NullableScalarWriter extends AbstractScalarWriter {
 
     baseWriter.setString(value);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override
   public void setBytes(byte[] value, int len) {
     baseWriter.setBytes(value, len);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override
   public void setDecimal(BigDecimal value) {
     baseWriter.setDecimal(value);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override
   public void setPeriod(Period value) {
     baseWriter.setPeriod(value);
     isSetWriter.setInt(1);
+    writerIndex.nextElement();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
index d5f9b30..49d16a3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
@@ -171,7 +171,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
     if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
       realloc(MIN_BUFFER_SIZE);
     }
-    vector.getBuffer().unsafePutInt(0, 0);
+    vector.getBuffer().setInt(0, 0);
   }
 
   public int nextOffset() { return nextOffset; }
@@ -199,7 +199,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
 
     final int valueIndex = vectorIndex.vectorIndex();
     int writeIndex = valueIndex + 1;
-    if (lastWriteIndex < valueIndex - 1 || writeIndex >= capacity) {
+    if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
       writeIndex = prepareWrite(writeIndex);
     }
 
@@ -207,7 +207,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
     // Recall, it is the value index, which is one less than the (end)
     // offset index.
 
-    lastWriteIndex = writeIndex - 1;
+    lastWriteIndex = valueIndex;
     return writeIndex;
   }
 
@@ -220,24 +220,27 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
     // Call to resize may cause rollover, so reset write index
     // afterwards.
 
-    writeIndex = vectorIndex.vectorIndex() + 1;
+    final int valueIndex = vectorIndex.vectorIndex();
 
     // Fill empties to the write position.
+    // Fill empties works off the row index, not the write
+    // index. The write index is one past the row index.
+    // (Yes, this is complex...)
 
-    fillEmpties(writeIndex);
-    return writeIndex;
+    fillEmpties(valueIndex);
+    return valueIndex + 1;
   }
 
   @Override
-  protected final void fillEmpties(final int writeIndex) {
-    while (lastWriteIndex < writeIndex - 1) {
-      drillBuf.unsafePutInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
+  protected final void fillEmpties(final int valueIndex) {
+    while (lastWriteIndex < valueIndex - 1) {
+      drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
     }
   }
 
   public final void setNextOffset(final int newOffset) {
     final int writeIndex = writeIndex();
-    drillBuf.unsafePutInt(writeIndex * VALUE_WIDTH, newOffset);
+    drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
     nextOffset = newOffset;
   }
 
@@ -267,8 +270,17 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
   }
 
   @Override
-  public final void endWrite() {
-    setValueCount(vectorIndex.vectorIndex() + 1);
+  public void setValueCount(int valueCount) {
+
+    // Slightly different version of the fixed-width writer
+    // code. Offset counts are one greater than the value count.
+    // But for all other purposes, we track the value (row)
+    // position, not the offset position.
+
+    mandatoryResize(valueCount);
+    fillEmpties(valueCount);
+    vector().getBuffer().writerIndex((valueCount + 1) * width());
+    lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1);
   }
 
   @Override


[2/2] drill git commit: Merge branch 'master' of https://github.com/apache/drill

Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/drill


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ee9e613d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ee9e613d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ee9e613d

Branch: refs/heads/master
Commit: ee9e613d779c5f2961df3f26f5392240b735da5e
Parents: f0d00c6 cee67de
Author: Boaz Ben-Zvi <bo...@apache.org>
Authored: Tue Jan 30 19:56:16 2018 -0800
Committer: Boaz Ben-Zvi <bo...@apache.org>
Committed: Tue Jan 30 19:56:16 2018 -0800

----------------------------------------------------------------------

----------------------------------------------------------------------