You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by bo...@apache.org on 2018/01/31 03:56:37 UTC
[1/2] drill git commit: DRILL-6080: Sort incorrectly limits batch
size to 65535 records
Repository: drill
Updated Branches:
refs/heads/master cee67de2e -> ee9e613d7
DRILL-6080: Sort incorrectly limits batch size to 65535 records
closes #1090
* Sort incorrectly limits batch size to 65535 records rather than 65536.
* This PR also includes a few code cleanup items.
* Fix for overflow in offset vector in row set writer
* Performance tool update
* Replace "unsafe" methods with "set" methods
* Also fixes an indexing issue with nullable writers
* Removed debug & timing code
* Increase strictness for batch size
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f0d00c62
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f0d00c62
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f0d00c62
Branch: refs/heads/master
Commit: f0d00c62b594e424ea085ebd0a5be26f0f509fda
Parents: 039530a
Author: Paul Rogers <pr...@maprtech.com>
Authored: Wed Jan 10 16:04:53 2018 -0800
Committer: Boaz Ben-Zvi <bo...@apache.org>
Committed: Tue Jan 30 19:54:57 2018 -0800
----------------------------------------------------------------------
.../impl/sort/SortRecordBatchBuilder.java | 6 +-
.../exec/physical/impl/xsort/MSortTemplate.java | 9 +-
.../exec/record/selection/SelectionVector4.java | 4 +-
.../impl/xsort/managed/TestSortImpl.java | 72 ++++++++--------
.../drill/test/rowSet/test/PerformanceTool.java | 10 +++
.../rowSet/test/TestVariableWidthWriter.java | 2 +-
.../src/main/java/io/netty/buffer/DrillBuf.java | 86 --------------------
.../main/codegen/templates/ColumnAccessors.java | 24 +++---
.../writer/AbstractFixedWidthWriter.java | 2 +-
.../accessor/writer/NullableScalarWriter.java | 51 +++++++++++-
.../accessor/writer/OffsetVectorWriter.java | 36 +++++---
11 files changed, 145 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 6c66c01..d995902 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -105,7 +105,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
return;
}
- if(runningBatches >= Character.MAX_VALUE) {
+ if (runningBatches >= Character.MAX_VALUE) {
final String errMsg = String.format("Tried to add more than %d number of batches.", (int) Character.MAX_VALUE);
logger.error(errMsg);
throw new DrillRuntimeException(errMsg);
@@ -152,7 +152,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
if (svBuffer == null) {
throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
}
- sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
+ sv4 = new SelectionVector4(svBuffer, recordCount, ValueVector.MAX_ROW_COUNT);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
@@ -174,7 +174,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
int recordBatchId = 0;
for (RecordBatchData d : data) {
for (int i = 0; i < d.getRecordCount(); i++, index++) {
- sv4.set(index, recordBatchId, (int) d.getSv2().getIndex(i));
+ sv4.set(index, recordBatchId, d.getSv2().getIndex(i));
}
// might as well drop the selection vector since we'll stop using it now.
d.getSv2().clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9b69170..afbc58b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -32,10 +32,10 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.util.IndexedSortable;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;
public abstract class MSortTemplate implements MSorter, IndexedSortable {
@@ -43,7 +43,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
private SelectionVector4 vector4;
private SelectionVector4 aux;
- private long compares;
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
private FragmentContext context;
@@ -74,13 +73,14 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
throw new UnsupportedOperationException(String.format("Missing batch. batch: %d newBatch: %d", batch, newBatch));
}
}
+ @SuppressWarnings("resource")
final DrillBuf drillBuf = allocator.buffer(4 * totalCount);
try {
desiredRecordBatchCount = context.getConfig().getInt(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE);
} catch(ConfigException.Missing e) {
// value not found, use default value instead
- desiredRecordBatchCount = Character.MAX_VALUE;
+ desiredRecordBatchCount = ValueVector.MAX_ROW_COUNT;
}
aux = new SelectionVector4(drillBuf, totalCount, desiredRecordBatchCount);
}
@@ -126,7 +126,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
@Override
public void sort(final VectorContainer container) {
- final Stopwatch watch = Stopwatch.createStarted();
while (runStarts.size() > 1) {
// check if we're cancelled/failed frequently
@@ -153,6 +152,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
if (outIndex < vector4.getTotalCount()) {
copyRun(outIndex, vector4.getTotalCount());
}
+ @SuppressWarnings("resource")
final SelectionVector4 tmp = aux.createNewWrapperCurrent(desiredRecordBatchCount);
aux.clear();
aux = vector4.createNewWrapperCurrent(desiredRecordBatchCount);
@@ -181,7 +181,6 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
public int compare(final int leftIndex, final int rightIndex) {
final int sv1 = vector4.get(leftIndex);
final int sv2 = vector4.get(rightIndex);
- compares++;
try {
return doEval(sv1, sv2);
} catch (SchemaChangeException e) {
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
index b51fdca..2c10d6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector4.java
@@ -112,8 +112,8 @@ public class SelectionVector4 implements AutoCloseable {
return false;
}
- start = start+length;
- int newEnd = Math.min(start+length, recordCount);
+ start = start + length;
+ int newEnd = Math.min(start + length, recordCount);
length = newEnd - start;
// logger.debug("New start {}, new length {}", start, length);
return true;
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index cdca30e..a985478 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -23,13 +23,13 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.drill.categories.OperatorTest;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
@@ -38,6 +38,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.DirectRowSet;
@@ -45,16 +46,18 @@ import org.apache.drill.test.rowSet.HyperRowSetImpl;
import org.apache.drill.test.rowSet.IndirectRowSet;
import org.apache.drill.test.rowSet.RowSet;
import org.apache.drill.test.rowSet.RowSet.ExtendableRowSet;
-import org.apache.drill.test.rowSet.RowSetReader;
-import org.apache.drill.test.rowSet.RowSetWriter;
import org.apache.drill.test.rowSet.RowSetBuilder;
import org.apache.drill.test.rowSet.RowSetComparison;
+import org.apache.drill.test.rowSet.RowSetReader;
+import org.apache.drill.test.rowSet.RowSetWriter;
import org.apache.drill.test.rowSet.SchemaBuilder;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
-import com.google.common.base.Stopwatch;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.junit.experimental.categories.Category;
+
+import io.netty.buffer.DrillBuf;
/**
* Tests the external sort implementation: the "guts" of the sort stripped of the
@@ -157,12 +160,6 @@ public class TestSortImpl extends DrillTest {
for (RowSet expectedSet : expected) {
assertTrue(results.next());
RowSet rowSet = toRowSet(results, dest);
- // Uncomment these for debugging. Leave them commented otherwise
- // to avoid polluting the Maven build output unnecessarily.
-// System.out.println("Expected:");
-// expectedSet.print();
-// System.out.println("Actual:");
-// rowSet.print();
new RowSetComparison(expectedSet)
.verify(rowSet);
expectedSet.clear();
@@ -325,7 +322,8 @@ public class TestSortImpl extends DrillTest {
public DataGenerator(OperatorFixture fixture, int targetCount, int batchSize, int seed, int step) {
this.fixture = fixture;
this.targetCount = targetCount;
- this.batchSize = Math.min(batchSize, Character.MAX_VALUE);
+ Preconditions.checkArgument(batchSize > 0 && batchSize <= ValueVector.MAX_ROW_COUNT);
+ this.batchSize = batchSize;
this.step = step;
schema = SortTestUtilities.nonNullSchema();
currentValue = seed;
@@ -380,7 +378,8 @@ public class TestSortImpl extends DrillTest {
public DataValidator(int targetCount, int batchSize) {
this.targetCount = targetCount;
- this.batchSize = Math.min(batchSize, Character.MAX_VALUE);
+ Preconditions.checkArgument(batchSize > 0 && batchSize <= ValueVector.MAX_ROW_COUNT);
+ this.batchSize = batchSize;
}
public void validate(RowSet output) {
@@ -400,8 +399,6 @@ public class TestSortImpl extends DrillTest {
}
}
- Stopwatch timer = Stopwatch.createUnstarted();
-
/**
* Run a full-blown sort test with multiple input batches. Because we want to
* generate multiple inputs, we don't create them statically. Instead, we generate
@@ -428,33 +425,25 @@ public class TestSortImpl extends DrillTest {
if (batchCount == 1) {
// Simulates a NEW_SCHEMA event
- timer.start();
sort.setSchema(input.container().getSchema());
- timer.stop();
}
// Simulates an OK event
- timer.start();
sort.addBatch(input.vectorAccessible());
- timer.stop();
}
// Simulate returning results
- timer.start();
SortResults results = sort.startMerge();
if (results.getContainer() != dest) {
dest.clear();
dest = results.getContainer();
}
while (results.next()) {
- timer.stop();
RowSet output = toRowSet(results, dest);
validator.validate(output);
- timer.start();
}
- timer.stop();
validator.validateDone();
results.close();
dest.clear();
@@ -469,11 +458,9 @@ public class TestSortImpl extends DrillTest {
*/
public void runJumboBatchTest(OperatorFixture fixture, int rowCount) {
- timer.reset();
- DataGenerator dataGen = new DataGenerator(fixture, rowCount, Character.MAX_VALUE);
- DataValidator validator = new DataValidator(rowCount, Character.MAX_VALUE);
+ DataGenerator dataGen = new DataGenerator(fixture, rowCount, ValueVector.MAX_ROW_COUNT);
+ DataValidator validator = new DataValidator(rowCount, ValueVector.MAX_ROW_COUNT);
runLargeSortTest(fixture, dataGen, validator);
- System.out.println(timer.elapsed(TimeUnit.MILLISECONDS));
}
/**
@@ -499,7 +486,30 @@ public class TestSortImpl extends DrillTest {
@Test
public void testLargeBatch() throws Exception {
try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
- runJumboBatchTest(fixture, Character.MAX_VALUE);
+// partyOnMemory(fixture.allocator());
+ runJumboBatchTest(fixture, ValueVector.MAX_ROW_COUNT);
+ }
+ }
+
+ /**
+ * Use this function to pre-load Netty's free list with a large
+ * number of "dirty" blocks. This will often catch error due to
+ * failure to initialize value vector memory.
+ *
+ * @param fixture the operator fixture that provides an allocator
+ */
+
+ @SuppressWarnings("unused")
+ private void partyOnMemory(BufferAllocator allocator) {
+ DrillBuf bufs[] = new DrillBuf[10];
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i] = allocator.buffer(ValueVector.MAX_BUFFER_SIZE);
+ for (int j = 0; j < ValueVector.MAX_BUFFER_SIZE; j += 4) {
+ bufs[i].setInt(j, 0xDEADBEEF);
+ }
+ }
+ for (int i = 0; i < bufs.length; i++) {
+ bufs[i].release();
}
}
@@ -533,8 +543,6 @@ public class TestSortImpl extends DrillTest {
VectorContainer dest = new VectorContainer();
SortImpl sort = makeSortImpl(fixture, Ordering.ORDER_ASC, Ordering.NULLS_UNSPECIFIED, dest);
- timer.reset();
- timer.start();
sort.setSchema(rowSet.container().getSchema());
sort.addBatch(rowSet.vectorAccessible());
SortResults results = sort.startMerge();
@@ -543,13 +551,11 @@ public class TestSortImpl extends DrillTest {
dest = results.getContainer();
}
assertTrue(results.next());
- timer.stop();
assertFalse(results.next());
results.close();
dest.clear();
sort.close();
sort.opContext().close();
- System.out.println(timer.elapsed(TimeUnit.MILLISECONDS));
}
/**
@@ -561,7 +567,7 @@ public class TestSortImpl extends DrillTest {
@Test
public void testWideRows() throws Exception {
try (OperatorFixture fixture = OperatorFixture.standardFixture()) {
- runWideRowsTest(fixture, 1000, Character.MAX_VALUE);
+ runWideRowsTest(fixture, 1000, ValueVector.MAX_ROW_COUNT);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
index e84f2d3..ca282a1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/PerformanceTool.java
@@ -47,6 +47,16 @@ import com.google.common.base.Stopwatch;
* <p>
* Current results:
* <ul>
+ * <li>Required and nullable writers are slightly faster than the
+ * corresponding vector mutator methods.</li>
+ * <li>Writer is 230% faster than a repeated mutator.</li>
+ * </ul>
+ *
+ * The key reason for the converged performance (now compared to earlier
+ * results below) is that both paths now use bounds-checking optimizations.
+ * <p>
+ * Prior results before the common bounds-check optimizations:
+ * <ul>
* <li>Writer is 42% faster than a required mutator.</li>
* <li>Writer is 73% faster than a nullable mutator.</li>
* <li>Writer is 407% faster than a repeated mutator.</li>
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
index 103b212..912f362 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/rowSet/test/TestVariableWidthWriter.java
@@ -373,7 +373,7 @@ public class TestVariableWidthWriter extends SubOperatorTest {
@Override
public boolean canExpand(ScalarWriter writer, int delta) {
- System.out.println("Delta: " + delta);
+// System.out.println("Delta: " + delta);
totalAlloc += delta;
return totalAlloc < 1024 * 1024;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
index 115d31e..109500a 100644
--- a/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/memory/base/src/main/java/io/netty/buffer/DrillBuf.java
@@ -712,25 +712,6 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
return this;
}
- // Clone of UDLE's setBytes(), but with bounds checking done as a boolean,
- // not assertion.
-
- public boolean setBytesBounded(int index, byte[] src, int srcIndex, int length) {
- // Must do here because Drill's UDLE is not ref counted.
- // Done as an assert to avoid production overhead: if this is going
- // to fail, it will do so spectacularly in tests, due to a programming error.
- assert refCnt() > 0;
- return udle.setBytesBounded(index, src, srcIndex, length);
- }
-
- // As above, but for direct memory.
-
- public boolean setBytesBounded(int index, DrillBuf src, int srcIndex, int length) {
- // See above.
- assert refCnt() > 0;
- return udle.setBytesBounded(index, src.udle, srcIndex, length);
- }
-
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
udle.setBytes(index + offset, src, srcIndex, length);
@@ -842,71 +823,4 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
historicalLog.buildHistory(sb, indent + 1, verbosity.includeStackTraces);
}
}
-
- // The "unsafe" methods are for use ONLY by code that does its own
- // bounds checking. They are called "unsafe" for a reason: they will crash
- // the JVM if values are addressed out of bounds.
-
- /**
- * Write an integer to the buffer at the given byte index, without
- * bounds checks.
- *
- * @param offset byte (not int) offset of the location to write
- * @param value the value to write
- */
-
- public void unsafePutInt(int offset, int value) {
- PlatformDependent.putInt(addr + offset, value);
- }
-
- /**
- * Write a long to the buffer at the given byte index, without
- * bounds checks.
- *
- * @param index byte (not long) offset of the location to write
- * @param value the value to write
- */
-
- public void unsafePutLong(int index, long value) {
- PlatformDependent.putLong(addr + index, value);
- }
-
- /**
- * Write a short to the buffer at the given byte index, without
- * bounds checks.
- *
- * @param offset byte (not short) offset of the location to write
- * @param value the value to write
- */
-
- public void unsafePutShort(int offset, short value) {
- PlatformDependent.putShort(addr + offset, value);
- }
-
- /**
- * Write a byte to the buffer at the given byte index, without
- * bounds checks.
- *
- * @param offset byte offset of the location to write
- * @param value the value to write
- */
-
- public void unsafePutByte(int offset, byte value) {
- PlatformDependent.putByte(addr + offset, value);
- }
-
- /**
- * Copy a buffer of heap data to the buffer memory.
- *
- * @param srce source byte buffer
- * @param srcOffset offset within the byte buffer of the start of data
- * @param destOffset byte offset into this buffer to which to write the
- * data
- * @param length length of the data, which must be within the
- * bounds of this buffer
- */
-
- public void unsafeCopyMemory(byte[] srce, int srcOffset, int destOffset, int length) {
- PlatformDependent.copyMemory(srce, srcOffset, addr + destOffset, length);
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/codegen/templates/ColumnAccessors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/ColumnAccessors.java b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
index 33b12be..14ec1e8 100644
--- a/exec/vector/src/main/codegen/templates/ColumnAccessors.java
+++ b/exec/vector/src/main/codegen/templates/ColumnAccessors.java
@@ -276,14 +276,14 @@ public class ColumnAccessors {
<#assign putAddr = "writeIndex * VALUE_WIDTH">
</#if>
<#if varWidth>
- drillBuf.unsafeCopyMemory(value, 0, offset, len);
+ drillBuf.setBytes(offset, value, 0, len);
offsetsWriter.setNextOffset(offset + len);
<#elseif drillType == "Decimal9">
- drillBuf.unsafePutInt(${putAddr},
+ drillBuf.setInt(${putAddr},
DecimalUtility.getDecimal9FromBigDecimal(value,
type.getScale(), type.getPrecision()));
<#elseif drillType == "Decimal18">
- drillBuf.unsafePutLong(${putAddr},
+ drillBuf.setLong(${putAddr},
DecimalUtility.getDecimal18FromBigDecimal(value,
type.getScale(), type.getPrecision()));
<#elseif drillType == "Decimal38Sparse">
@@ -295,23 +295,23 @@ public class ColumnAccessors {
DecimalUtility.getSparseFromBigDecimal(value, vector.getBuffer(), writeIndex * VALUE_WIDTH,
type.getScale(), type.getPrecision(), 5);
<#elseif drillType == "IntervalYear">
- drillBuf.unsafePutInt(${putAddr},
+ drillBuf.setInt(${putAddr},
value.getYears() * 12 + value.getMonths());
<#elseif drillType == "IntervalDay">
final int offset = ${putAddr};
- drillBuf.unsafePutInt(offset, value.getDays());
- drillBuf.unsafePutInt(offset + 4, periodToMillis(value));
+ drillBuf.setInt(offset, value.getDays());
+ drillBuf.setInt(offset + 4, periodToMillis(value));
<#elseif drillType == "Interval">
final int offset = ${putAddr};
- drillBuf.unsafePutInt(offset, value.getYears() * 12 + value.getMonths());
- drillBuf.unsafePutInt(offset + 4, value.getDays());
- drillBuf.unsafePutInt(offset + 8, periodToMillis(value));
+ drillBuf.setInt(offset, value.getYears() * 12 + value.getMonths());
+ drillBuf.setInt(offset + 4, value.getDays());
+ drillBuf.setInt(offset + 8, periodToMillis(value));
<#elseif drillType == "Float4">
- drillBuf.unsafePutInt(${putAddr}, Float.floatToRawIntBits((float) value));
+ drillBuf.setInt(${putAddr}, Float.floatToRawIntBits((float) value));
<#elseif drillType == "Float8">
- drillBuf.unsafePutLong(${putAddr}, Double.doubleToRawLongBits(value));
+ drillBuf.setLong(${putAddr}, Double.doubleToRawLongBits(value));
<#else>
- drillBuf.unsafePut${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
+ drillBuf.set${putType?cap_first}(${putAddr}, <#if doCast>(${putType}) </#if>value);
</#if>
vectorIndex.nextElement();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
index e49f92c..1107216 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/AbstractFixedWidthWriter.java
@@ -98,7 +98,7 @@ public abstract class AbstractFixedWidthWriter extends BaseScalarWriter {
while (dest < writeIndex) {
int length = writeIndex - dest;
length = Math.min(length, stride);
- drillBuf.unsafeCopyMemory(ZERO_BUF, 0, dest * width, length * width);
+ drillBuf.setBytes(dest * width, ZERO_BUF, 0, length * width);
dest += length;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
index 6da2b50..2068304 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/NullableScalarWriter.java
@@ -30,8 +30,45 @@ import org.joda.time.Period;
public class NullableScalarWriter extends AbstractScalarWriter {
+ public static final class ChildIndex implements ColumnWriterIndex {
+
+ private final ColumnWriterIndex parentIndex;
+
+ public ChildIndex(ColumnWriterIndex parentIndex) {
+ this.parentIndex = parentIndex;
+ }
+
+ @Override
+ public int rowStartIndex() {
+ return parentIndex.rowStartIndex();
+ }
+
+ @Override
+ public int vectorIndex() {
+ return parentIndex.vectorIndex();
+ }
+
+ @Override
+ public void nextElement() {
+ // Ignore next element requests from children.
+ // Nullable writers have two children, we don't want
+ // to increment the index twice.
+ }
+
+ @Override
+ public void rollover() {
+ parentIndex.rollover();
+ }
+
+ @Override
+ public ColumnWriterIndex outerIndex() {
+ return parentIndex.outerIndex();
+ }
+ }
+
private final UInt1ColumnWriter isSetWriter;
private final BaseScalarWriter baseWriter;
+ private ColumnWriterIndex writerIndex;
public NullableScalarWriter(NullableVector nullableVector, BaseScalarWriter baseWriter) {
isSetWriter = new UInt1ColumnWriter(nullableVector.getBitsVector());
@@ -54,8 +91,10 @@ public class NullableScalarWriter extends AbstractScalarWriter {
@Override
public void bindIndex(ColumnWriterIndex index) {
- isSetWriter.bindIndex(index);
- baseWriter.bindIndex(index);
+ writerIndex = index;
+ ColumnWriterIndex childIndex = new ChildIndex(index);
+ isSetWriter.bindIndex(childIndex);
+ baseWriter.bindIndex(childIndex);
}
@Override
@@ -76,24 +115,28 @@ public class NullableScalarWriter extends AbstractScalarWriter {
public void setNull() {
isSetWriter.setInt(0);
baseWriter.skipNulls();
+ writerIndex.nextElement();
}
@Override
public void setInt(int value) {
baseWriter.setInt(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setLong(long value) {
baseWriter.setLong(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setDouble(double value) {
baseWriter.setDouble(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
@@ -105,24 +148,28 @@ public class NullableScalarWriter extends AbstractScalarWriter {
baseWriter.setString(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setBytes(byte[] value, int len) {
baseWriter.setBytes(value, len);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setDecimal(BigDecimal value) {
baseWriter.setDecimal(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
public void setPeriod(Period value) {
baseWriter.setPeriod(value);
isSetWriter.setInt(1);
+ writerIndex.nextElement();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/f0d00c62/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
index d5f9b30..49d16a3 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/writer/OffsetVectorWriter.java
@@ -171,7 +171,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
if (capacity * VALUE_WIDTH < MIN_BUFFER_SIZE) {
realloc(MIN_BUFFER_SIZE);
}
- vector.getBuffer().unsafePutInt(0, 0);
+ vector.getBuffer().setInt(0, 0);
}
public int nextOffset() { return nextOffset; }
@@ -199,7 +199,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
final int valueIndex = vectorIndex.vectorIndex();
int writeIndex = valueIndex + 1;
- if (lastWriteIndex < valueIndex - 1 || writeIndex >= capacity) {
+ if (lastWriteIndex + 1 < valueIndex || writeIndex >= capacity) {
writeIndex = prepareWrite(writeIndex);
}
@@ -207,7 +207,7 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
// Recall, it is the value index, which is one less than the (end)
// offset index.
- lastWriteIndex = writeIndex - 1;
+ lastWriteIndex = valueIndex;
return writeIndex;
}
@@ -220,24 +220,27 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
// Call to resize may cause rollover, so reset write index
// afterwards.
- writeIndex = vectorIndex.vectorIndex() + 1;
+ final int valueIndex = vectorIndex.vectorIndex();
// Fill empties to the write position.
+ // Fill empties works off the row index, not the write
+ // index. The write index is one past the row index.
+ // (Yes, this is complex...)
- fillEmpties(writeIndex);
- return writeIndex;
+ fillEmpties(valueIndex);
+ return valueIndex + 1;
}
@Override
- protected final void fillEmpties(final int writeIndex) {
- while (lastWriteIndex < writeIndex - 1) {
- drillBuf.unsafePutInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
+ protected final void fillEmpties(final int valueIndex) {
+ while (lastWriteIndex < valueIndex - 1) {
+ drillBuf.setInt((++lastWriteIndex + 1) * VALUE_WIDTH, nextOffset);
}
}
public final void setNextOffset(final int newOffset) {
final int writeIndex = writeIndex();
- drillBuf.unsafePutInt(writeIndex * VALUE_WIDTH, newOffset);
+ drillBuf.setInt(writeIndex * VALUE_WIDTH, newOffset);
nextOffset = newOffset;
}
@@ -267,8 +270,17 @@ public class OffsetVectorWriter extends AbstractFixedWidthWriter {
}
@Override
- public final void endWrite() {
- setValueCount(vectorIndex.vectorIndex() + 1);
+ public void setValueCount(int valueCount) {
+
+ // Slightly different version of the fixed-width writer
+ // code. Offset counts are one greater than the value count.
+ // But for all other purposes, we track the value (row)
+ // position, not the offset position.
+
+ mandatoryResize(valueCount);
+ fillEmpties(valueCount);
+ vector().getBuffer().writerIndex((valueCount + 1) * width());
+ lastWriteIndex = Math.max(lastWriteIndex, valueCount - 1);
}
@Override
[2/2] drill git commit: Merge branch 'master' of
https://github.com/apache/drill
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/drill
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ee9e613d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ee9e613d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ee9e613d
Branch: refs/heads/master
Commit: ee9e613d779c5f2961df3f26f5392240b735da5e
Parents: f0d00c6 cee67de
Author: Boaz Ben-Zvi <bo...@apache.org>
Authored: Tue Jan 30 19:56:16 2018 -0800
Committer: Boaz Ben-Zvi <bo...@apache.org>
Committed: Tue Jan 30 19:56:16 2018 -0800
----------------------------------------------------------------------
----------------------------------------------------------------------