You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 18:29:11 UTC
[3/5] drill git commit: DRILL-5325: Unit tests for the managed sort
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
index 76b178c..7a460f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierTemplate.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.xsort.managed;
-import io.netty.buffer.DrillBuf;
-
import java.io.IOException;
import java.util.List;
@@ -26,11 +24,11 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.AllocationHelper;
+
+import io.netty.buffer.DrillBuf;
public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierTemplate.class);
@@ -43,7 +41,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
private int queueSize = 0;
@Override
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+ public void setup(BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
VectorAccessible outgoing) throws SchemaChangeException {
this.hyperBatch = hyperBatch;
this.batchGroups = batchGroups;
@@ -53,7 +51,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
@SuppressWarnings("resource")
final DrillBuf drillBuf = allocator.buffer(4 * size);
vector4 = new SelectionVector4(drillBuf, size, Character.MAX_VALUE);
- doSetup(context, hyperBatch, outgoing);
+ doSetup(hyperBatch, outgoing);
queueSize = 0;
for (int i = 0; i < size; i++) {
@@ -68,7 +66,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
@Override
public int next(int targetRecordCount) {
- allocateVectors(targetRecordCount);
+ VectorAccessibleUtilities.allocateVectors(outgoing, targetRecordCount);
for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
if (queueSize == 0) {
return 0;
@@ -76,7 +74,11 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
int compoundIndex = vector4.get(0);
int batch = compoundIndex >>> 16;
assert batch < batchGroups.size() : String.format("batch: %d batchGroups: %d", batch, batchGroups.size());
- doCopy(compoundIndex, outgoingIndex);
+ try {
+ doCopy(compoundIndex, outgoingIndex);
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException(e);
+ }
int nextIndex = batchGroups.get(batch).getNextIndex();
if (nextIndex < 0) {
vector4.set(0, vector4.get(--queueSize));
@@ -84,37 +86,28 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
vector4.set(0, batch, nextIndex);
}
if (queueSize == 0) {
- setValueCount(++outgoingIndex);
+ VectorAccessibleUtilities.setValueCount(outgoing, ++outgoingIndex);
return outgoingIndex;
}
- siftDown();
+ try {
+ siftDown();
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException(e);
+ }
}
- setValueCount(targetRecordCount);
+ VectorAccessibleUtilities.setValueCount(outgoing, targetRecordCount);
return targetRecordCount;
}
- private void setValueCount(int count) {
- for (VectorWrapper<?> w: outgoing) {
- w.getValueVector().getMutator().setValueCount(count);
- }
- }
-
@Override
public void close() throws IOException {
vector4.clear();
- for (final VectorWrapper<?> w: outgoing) {
- w.getValueVector().clear();
- }
- for (final VectorWrapper<?> w : hyperBatch) {
- w.clear();
- }
-
- for (BatchGroup batchGroup : batchGroups) {
- batchGroup.close();
- }
+ VectorAccessibleUtilities.clear(outgoing);
+ VectorAccessibleUtilities.clear(hyperBatch);
+ BatchGroup.closeAll(batchGroups);
}
- private void siftUp() {
+ private void siftUp() throws SchemaChangeException {
int p = queueSize;
while (p > 0) {
if (compare(p, (p - 1) / 2) < 0) {
@@ -126,13 +119,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
}
}
- private void allocateVectors(int targetRecordCount) {
- for (VectorWrapper<?> w: outgoing) {
- AllocationHelper.allocateNew(w.getValueVector(), targetRecordCount);
- }
- }
-
- private void siftDown() {
+ private void siftDown() throws SchemaChangeException {
int p = 0;
int next;
while (p * 2 + 1 < queueSize) { // While the current node has at least one child
@@ -160,14 +147,19 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
vector4.set(sv1, tmp);
}
- public int compare(int leftIndex, int rightIndex) {
+ public int compare(int leftIndex, int rightIndex) throws SchemaChangeException {
int sv1 = vector4.get(leftIndex);
int sv2 = vector4.get(rightIndex);
return doEval(sv1, sv2);
}
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
- public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
- public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
-
+ public abstract void doSetup(@Named("incoming") VectorAccessible incoming,
+ @Named("outgoing") VectorAccessible outgoing)
+ throws SchemaChangeException;
+ public abstract int doEval(@Named("leftIndex") int leftIndex,
+ @Named("rightIndex") int rightIndex)
+ throws SchemaChangeException;
+ public abstract void doCopy(@Named("inIndex") int inIndex,
+ @Named("outIndex") int outIndex)
+ throws SchemaChangeException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
new file mode 100644
index 0000000..6b71782
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/PriorityQueueCopierWrapper.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.CopyUtil;
+import org.apache.drill.exec.vector.ValueVector;
+
+import com.google.common.base.Stopwatch;
+
+/**
+ * Manages a {@link PriorityQueueCopier} instance produced from code generation.
+ * Provides a wrapper around a copier "session" to simplify reading batches
+ * from the copier.
+ */
+
+public class PriorityQueueCopierWrapper extends BaseSortWrapper {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueCopierWrapper.class);
+
+ private static final GeneratorMapping COPIER_MAPPING = new GeneratorMapping("doSetup", "doCopy", null, null);
+ private static final MappingSet COPIER_MAPPING_SET = new MappingSet(COPIER_MAPPING, COPIER_MAPPING);
+
+ /**
+ * A single PriorityQueueCopier instance is used for 2 purposes:
+ * 1. Merge sorted batches before spilling
+ * 2. Merge sorted batches when all incoming data fits in memory
+ */
+
+ private PriorityQueueCopier copier;
+
+ public PriorityQueueCopierWrapper(OperExecContext opContext) {
+ super(opContext);
+ }
+
+ public PriorityQueueCopier getCopier(VectorAccessible batch) {
+ if (copier == null) {
+ copier = newCopier(batch);
+ }
+ return copier;
+ }
+
+ private PriorityQueueCopier newCopier(VectorAccessible batch) {
+ // Generate the copier code and obtain the resulting class
+
+ CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptionSet());
+ ClassGenerator<PriorityQueueCopier> g = cg.getRoot();
+ cg.plainJavaCapable(true);
+ // Uncomment out this line to debug the generated code.
+// cg.saveCodeForDebugging(true);
+
+ generateComparisons(g, batch, logger);
+
+ g.setMappingSet(COPIER_MAPPING_SET);
+ CopyUtil.generateCopies(g, batch, true);
+ g.setMappingSet(MAIN_MAPPING);
+ return getInstance(cg, logger);
+ }
+
+ /**
+ * Start a merge operation using the specified vector container. Used for
+ * the final merge operation.
+ *
+ * @param schema
+ * @param batchGroupList
+ * @param outputContainer
+ * @param targetRecordCount
+ * @return
+ */
+ public BatchMerger startMerge(BatchSchema schema, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer, int targetRecordCount) {
+ return new BatchMerger(this, schema, batchGroupList, outputContainer, targetRecordCount);
+ }
+
+ /**
+ * Prepare a copier which will write a collection of vectors to disk. The copier
+ * uses generated code to do the actual writes. If the copier has not yet been
+ * created, generate code and create it. If it has been created, close it and
+ * prepare it for a new collection of batches.
+ *
+ * @param batch the (hyper) batch of vectors to be copied
+ * @param batchGroupList same batches as above, but represented as a list
+ * of individual batches
+ * @param outputContainer the container into which to copy the batches
+ */
+
+ @SuppressWarnings("unchecked")
+ private void createCopier(VectorAccessible batch, List<? extends BatchGroup> batchGroupList, VectorContainer outputContainer) {
+ copier = getCopier(batch);
+
+ // Initialize the value vectors for the output container
+
+ for (VectorWrapper<?> i : batch) {
+ @SuppressWarnings("resource")
+ ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ outputContainer.add(v);
+ }
+ try {
+ copier.setup(context.getAllocator(), batch, (List<BatchGroup>) batchGroupList, outputContainer);
+ } catch (SchemaChangeException e) {
+ throw UserException.unsupportedError(e)
+ .message("Unexpected schema change - likely code error.")
+ .build(logger);
+ }
+ }
+
+ public BufferAllocator getAllocator() { return context.getAllocator(); }
+
+ public void close() {
+ if (copier == null) {
+ return; }
+ try {
+ copier.close();
+ copier = null;
+ } catch (IOException e) {
+ throw UserException.dataWriteError(e)
+ .message("Failure while flushing spilled data")
+ .build(logger);
+ }
+ }
+
+ /**
+ * We've gathered a set of batches, each of which has been sorted. The batches
+ * may have passed through a filter and thus may have "holes" where rows have
+ * been filtered out. We will spill records in blocks of targetRecordCount.
+ * To prepare, copy that many records into an outputContainer as a set of
+ * contiguous values in new vectors. The result is a single batch with
+ * vectors that combine a collection of input batches up to the
+ * given threshold.
+ * <p>
+ * Input. Here the top line is a selection vector of indexes.
+ * The second line is a set of batch groups (separated by underscores)
+ * with letters indicating individual records:<pre>
+ * [3 7 4 8 0 6 1] [5 3 6 8 2 0]
+ * [eh_ad_ibf] [r_qm_kn_p]</pre>
+ * <p>
+ * Output, assuming blocks of 5 records. The brackets represent
+ * batches, the line represents the set of batches copied to the
+ * spill file.<pre>
+ * [abcde] [fhikm] [npqr]</pre>
+ * <p>
+ * The copying operation does a merge as well: copying
+ * values from the sources in ordered fashion. Consider a different example,
+ * we want to merge two input batches to produce a single output batch:
+ * <pre>
+ * Input: [aceg] [bdfh]
+ * Output: [abcdefgh]</pre>
+ * <p>
+ * In the above, the input consists of two sorted batches. (In reality,
+ * the input batches have an associated selection vector, but that is omitted
+ * here and just the sorted values shown.) The output is a single batch
+ * with the merged records (indicated by letters) from the two input batches.
+ * <p>
+ * Here we bind the copier to the batchGroupList of sorted, buffered batches
+ * to be merged. We bind the copier output to outputContainer: the copier will write its
+ * merged "batches" of records to that container.
+ * <p>
+ * Calls to the {@link #next()} method sequentially return merged batches
+ * of the desired row count.
+ */
+
+ public static class BatchMerger implements SortResults, AutoCloseable {
+
+ private PriorityQueueCopierWrapper holder;
+ private VectorContainer hyperBatch;
+ private VectorContainer outputContainer;
+ private int targetRecordCount;
+ private int copyCount;
+ private int batchCount;
+ private long estBatchSize;
+
+ /**
+ * Creates a merger with an temporary output container.
+ *
+ * @param holder the copier that does the work
+ * @param schema schema for the input and output batches
+ * @param batchGroupList the input batches
+ * @param targetRecordCount number of records for each output batch
+ */
+ private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+ int targetRecordCount) {
+ this(holder, schema, batchGroupList, new VectorContainer(), targetRecordCount);
+ }
+
+ /**
+ * Creates a merger with the specified output container
+ *
+ * @param holder the copier that does the work
+ * @param schema schema for the input and output batches
+ * @param batchGroupList the input batches
+ * @param outputContainer merges output batch into the given output container
+ * @param targetRecordCount number of records for each output batch
+ */
+ private BatchMerger(PriorityQueueCopierWrapper holder, BatchSchema schema, List<? extends BatchGroup> batchGroupList,
+ VectorContainer outputContainer, int targetRecordCount) {
+ this.holder = holder;
+ hyperBatch = constructHyperBatch(schema, batchGroupList);
+ copyCount = 0;
+ this.targetRecordCount = targetRecordCount;
+ this.outputContainer = outputContainer;
+ holder.createCopier(hyperBatch, batchGroupList, outputContainer);
+ }
+
+ /**
+ * Read the next merged batch. The batch holds the specified row count, but
+ * may be less if this is the last batch.
+ *
+ * @return the number of rows in the batch, or 0 if no more batches
+ * are available
+ */
+
+ @Override
+ public boolean next() {
+ Stopwatch w = Stopwatch.createStarted();
+ long start = holder.getAllocator().getAllocatedMemory();
+ int count = holder.copier.next(targetRecordCount);
+ copyCount += count;
+ if (count > 0) {
+ long t = w.elapsed(TimeUnit.MICROSECONDS);
+ batchCount++;
+ logger.trace("Took {} us to merge {} records", t, count);
+ long size = holder.getAllocator().getAllocatedMemory() - start;
+ estBatchSize = Math.max(estBatchSize, size);
+ } else {
+ logger.trace("copier returned 0 records");
+ }
+
+ // Identify the schema to be used in the output container. (Since
+ // all merged batches have the same schema, the schema we identify
+ // here should be the same as that which we already had.
+
+ outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+
+ // The copier does not set the record count in the output
+ // container, so do that here.
+
+ outputContainer.setRecordCount(count);
+
+ return count > 0;
+ }
+
+ /**
+ * Construct a vector container that holds a list of batches, each represented as an
+ * array of vectors. The entire collection of vectors has a common schema.
+ * <p>
+ * To build the collection, we go through the current schema (which has been
+ * devised to be common for all batches.) For each field in the schema, we create
+ * an array of vectors. To create the elements, we iterate over all the incoming
+ * batches and search for the vector that matches the current column.
+ * <p>
+ * Finally, we build a new schema for the combined container. That new schema must,
+ * because of the way the container was created, match the current schema.
+ *
+ * @param schema schema for the hyper batch
+ * @param batchGroupList list of batches to combine
+ * @return a container where each column is represented as an array of vectors
+ * (hence the "hyper" in the method name)
+ */
+
+ private VectorContainer constructHyperBatch(BatchSchema schema, List<? extends BatchGroup> batchGroupList) {
+ VectorContainer cont = new VectorContainer();
+ for (MaterializedField field : schema) {
+ ValueVector[] vectors = new ValueVector[batchGroupList.size()];
+ int i = 0;
+ for (BatchGroup group : batchGroupList) {
+ vectors[i++] = group.getValueAccessorById(
+ field.getValueClass(),
+ group.getValueVectorId(SchemaPath.getSimplePath(field.getPath())).getFieldIds())
+ .getValueVector();
+ }
+ cont.add(vectors);
+ }
+ cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
+ return cont;
+ }
+
+ @Override
+ public void close() {
+ hyperBatch.clear();
+ holder.close();
+ }
+
+ @Override
+ public int getRecordCount() { return outputContainer.getRecordCount(); }
+
+ @Override
+ public int getBatchCount() { return batchCount; }
+
+ /**
+ * Gets the estimated batch size, in bytes. Use for estimating the memory
+ * needed to process the batches that this operator created.
+ * @return the size of the largest batch created by this operation,
+ * in bytes
+ */
+
+ public long getEstBatchSize() { return estBatchSize; }
+
+ @Override
+ public SelectionVector4 getSv4() { return null; }
+
+ @Override
+ public SelectionVector2 getSv2() { return null; }
+
+ @Override
+ public VectorContainer getContainer() { return outputContainer; }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
new file mode 100644
index 0000000..e47d67e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+
+public class SortConfig {
+
+ /**
+ * Smallest allowed output batch size. The smallest output batch
+ * created even under constrained memory conditions.
+ */
+ public static final int MIN_MERGED_BATCH_SIZE = 256 * 1024;
+
+ /**
+ * In the bizarre case where the user gave us an unrealistically low
+ * spill file size, set a floor at some bare minimum size. (Note that,
+ * at this size, big queries will create a huge number of files, which
+ * is why the configuration default is one the order of hundreds of MB.)
+ */
+
+ public static final long MIN_SPILL_FILE_SIZE = 1 * 1024 * 1024;
+
+ public static final int DEFAULT_SPILL_BATCH_SIZE = 8 * 1024 * 1024;
+ public static final int MIN_SPILL_BATCH_SIZE = 256 * 1024;
+ public static final int MIN_MERGE_BATCH_SIZE = 256 * 1024;
+
+ public static final int MIN_MERGE_LIMIT = 2;
+
+ private final long maxMemory;
+
+ /**
+ * Maximum number of spilled runs that can be merged in a single pass.
+ */
+
+ private final int mergeLimit;
+
+ /**
+ * Target size of the first-generation spill files.
+ */
+ private final long spillFileSize;
+
+ private final int spillBatchSize;
+
+ private final int mergeBatchSize;
+
+ private final int bufferedBatchLimit;
+
+
+ public SortConfig(DrillConfig config) {
+
+ // Optional configured memory limit, typically used only for testing.
+
+ maxMemory = config.getBytes(ExecConstants.EXTERNAL_SORT_MAX_MEMORY);
+
+ // Optional limit on the number of spilled runs to merge in a single
+ // pass. Limits the number of open file handles. Must allow at least
+ // two batches to merge to make progress.
+
+ int limit = config.getInt(ExecConstants.EXTERNAL_SORT_MERGE_LIMIT);
+ if (limit > 0) {
+ mergeLimit = Math.max(limit, MIN_MERGE_LIMIT);
+ } else {
+ mergeLimit = Integer.MAX_VALUE;
+ }
+
+ // Limits the size of first-generation spill files.
+ // Ensure the size is reasonable.
+
+ spillFileSize = Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_FILE_SIZE), MIN_SPILL_FILE_SIZE);
+ spillBatchSize = (int) Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_SPILL_BATCH_SIZE), MIN_SPILL_BATCH_SIZE);
+
+ // Set the target output batch size. Use the maximum size, but only if
+ // this represents less than 10% of available memory. Otherwise, use 10%
+ // of memory, but no smaller than the minimum size. In any event, an
+ // output batch can contain no fewer than a single record.
+
+ mergeBatchSize = (int) Math.max(config.getBytes(ExecConstants.EXTERNAL_SORT_MERGE_BATCH_SIZE), MIN_MERGE_BATCH_SIZE);
+
+ // Limit on in-memory batches, primarily for testing.
+
+ int value = config.getInt(ExecConstants.EXTERNAL_SORT_BATCH_LIMIT);
+ if (value == 0) {
+ bufferedBatchLimit = Integer.MAX_VALUE;
+ } else {
+ bufferedBatchLimit = Math.max(value, 2);
+ }
+ logConfig();
+ }
+
+ private void logConfig() {
+ ExternalSortBatch.logger.debug("Config: " +
+ "spill file size = {}, spill batch size = {}, " +
+ "merge limit = {}, merge batch size = {}",
+ spillFileSize(), spillFileSize(),
+ mergeLimit(), mergeBatchSize());
+ }
+
+ public long maxMemory() { return maxMemory; }
+ public int mergeLimit() { return mergeLimit; }
+ public long spillFileSize() { return spillFileSize; }
+ public int spillBatchSize() { return spillBatchSize; }
+ public int mergeBatchSize() { return mergeBatchSize; }
+ public int getBufferedBatchLimit() { return bufferedBatchLimit; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
new file mode 100644
index 0000000..6f0da3d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortImpl.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortMemoryManager.MergeTask;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorAccessibleUtilities;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+/**
+ * Implementation of the external sort which is wrapped into the Drill
+ * "next" protocol by the {@link ExternalSortBatch} class.
+ * <p>
+ * Accepts incoming batches. Sorts each and will spill to disk as needed.
+ * When all input is delivered, can either do an in-memory merge or a
+ * merge from disk. If runs spilled, may have to do one or more "consolidation"
+ * passes to reduce the number of runs to the level that will fit in memory.
+ */
+
+public class SortImpl {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatch.class);
+
+ /**
+ * Iterates over the final sorted results. Implemented differently
+ * depending on whether the results are in-memory or spilled to
+ * disk.
+ */
+
+ public interface SortResults {
+ /**
+ * Container into which results are delivered. May the
+ * the original operator container, or may be a different
+ * one. This is the container that should be sent
+ * downstream. This is a fixed value for all returned
+ * results.
+ * @return
+ */
+ VectorContainer getContainer();
+ boolean next();
+ void close();
+ int getBatchCount();
+ int getRecordCount();
+ SelectionVector2 getSv2();
+ SelectionVector4 getSv4();
+ }
+
+ private final SortConfig config;
+ private final SortMetrics metrics;
+ private final SortMemoryManager memManager;
+ private VectorContainer outputBatch;
+ private OperExecContext context;
+
+ /**
+ * Memory allocator for this operator itself. Incoming batches are
+ * transferred into this allocator. Intermediate batches used during
+ * merge also reside here.
+ */
+
+ private final BufferAllocator allocator;
+
+ private final SpilledRuns spilledRuns;
+
+ private final BufferedBatches bufferedBatches;
+
+ public SortImpl(OperExecContext opContext, SortConfig sortConfig, SpilledRuns spilledRuns, VectorContainer batch) {
+ this.context = opContext;
+ outputBatch = batch;
+ this.spilledRuns = spilledRuns;
+ allocator = opContext.getAllocator();
+ config = sortConfig;
+ memManager = new SortMemoryManager(config, allocator.getLimit());
+ metrics = new SortMetrics(opContext.getStats());
+ bufferedBatches = new BufferedBatches(opContext);
+
+ // Reset the allocator to allow a 10% safety margin. This is done because
+ // the memory manager will enforce the original limit. Changing the hard
+ // limit will reduce the probability that random chance causes the allocator
+ // to kill the query because of a small, spurious over-allocation.
+
+ allocator.setLimit((long)(allocator.getLimit() * 1.10));
+ }
+
+ public void setSchema(BatchSchema schema) {
+ bufferedBatches.setSchema(schema);
+ spilledRuns.setSchema(schema);
+ }
+
+ public boolean forceSpill() {
+ if (bufferedBatches.size() < 2) {
+ return false;
+ }
+ spillFromMemory();
+ return true;
+ }
+
+ /**
+ * Process the converted incoming batch by adding it to the in-memory store
+ * of data, or spilling data to disk when necessary.
+ * @param incoming
+ */
+
+ public void addBatch(VectorAccessible incoming) {
+
+ // Skip empty batches (such as the first one.)
+
+ if (incoming.getRecordCount() == 0) {
+ VectorAccessibleUtilities.clear(incoming);
+ return;
+ }
+
+ // Determine actual sizes of the incoming batch before taking
+ // ownership. Allows us to figure out if we need to spill first,
+ // to avoid overflowing memory simply due to ownership transfer.
+
+ RecordBatchSizer sizer = analyzeIncomingBatch(incoming);
+
+ // The heart of the external sort operator: spill to disk when
+ // the in-memory generation exceeds the allowed memory limit.
+ // Preemptively spill BEFORE accepting the new batch into our memory
+ // pool. The allocator will throw an OOM exception if we accept the
+ // batch when we are near the limit - despite the fact that the batch
+ // is already in memory and no new memory is allocated during the transfer.
+
+ if ( isSpillNeeded(sizer.actualSize())) {
+ spillFromMemory();
+ }
+
+ // Sanity check. We should now be below the buffer memory maximum.
+
+ long startMem = allocator.getAllocatedMemory();
+ bufferedBatches.add(incoming, sizer.netSize());
+
+ // Compute batch size, including allocation of an sv2.
+
+ long endMem = allocator.getAllocatedMemory();
+ long batchSize = endMem - startMem;
+
+ // Update the minimum buffer space metric.
+
+ metrics.updateInputMetrics(sizer.rowCount(), sizer.actualSize());
+ metrics.updateMemory(memManager.freeMemory(endMem));
+ metrics.updatePeakBatches(bufferedBatches.size());
+
+ // Update the size based on the actual record count, not
+ // the effective count as given by the selection vector
+ // (which may exclude some records due to filtering.)
+
+ validateBatchSize(sizer.actualSize(), batchSize);
+ memManager.updateEstimates((int) batchSize, sizer.netRowWidth(), sizer.rowCount());
+ }
+
+ /**
+ * Scan the vectors in the incoming batch to determine batch size.
+ *
+ * @return an analysis of the incoming batch
+ */
+
+ private RecordBatchSizer analyzeIncomingBatch(VectorAccessible incoming) {
+ RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+ sizer.applySv2();
+ if (metrics.getInputBatchCount() == 0) {
+ logger.debug("{}", sizer.toString());
+ }
+ return sizer;
+ }
+
+ /**
+ * Determine if spill is needed before receiving the new record batch.
+ * Spilling is driven purely by memory availability (and an optional
+ * batch limit for testing.)
+ *
+ * @return true if spilling is needed, false otherwise
+ */
+
+ private boolean isSpillNeeded(int incomingSize) {
+
+ // Can't spill if less than two batches else the merge
+ // can't make progress.
+
+ if (bufferedBatches.size() < 2) {
+ return false; }
+
+ if (bufferedBatches.size() >= config.getBufferedBatchLimit()) {
+ return true; }
+ return memManager.isSpillNeeded(allocator.getAllocatedMemory(), incomingSize);
+ }
+
+ private void validateBatchSize(long actualBatchSize, long memoryDelta) {
+ if (actualBatchSize != memoryDelta) {
+ ExternalSortBatch.logger.debug("Memory delta: {}, actual batch size: {}, Diff: {}",
+ memoryDelta, actualBatchSize, memoryDelta - actualBatchSize);
+ }
+ }
+
+ /**
+ * This operator has accumulated a set of sorted incoming record batches.
+ * We wish to spill some of them to disk. To do this, a "copier"
+ * merges the target batches to produce a stream of new (merged) batches
+ * which are then written to disk.
+ * <p>
+ * This method spills only half the accumulated batches
+ * minimizing unnecessary disk writes. The exact count must lie between
+ * the minimum and maximum spill counts.
+ */
+
+ private void spillFromMemory() {
+ int startCount = bufferedBatches.size();
+ List<BatchGroup> batchesToSpill = bufferedBatches.prepareSpill(config.spillFileSize());
+
+ // Do the actual spill.
+
+ logger.trace("Spilling {} of {} batches, memory = {}",
+ batchesToSpill.size(), startCount,
+ allocator.getAllocatedMemory());
+ int spillBatchRowCount = memManager.getSpillBatchRowCount();
+ spilledRuns.mergeAndSpill(batchesToSpill, spillBatchRowCount);
+ metrics.incrSpillCount();
+ }
+
+ public SortMetrics getMetrics() { return metrics; }
+
+ public static class EmptyResults implements SortResults {
+
+ private final VectorContainer dest;
+
+ public EmptyResults(VectorContainer dest) {
+ this.dest = dest;
+ }
+
+ @Override
+ public boolean next() { return false; }
+
+ @Override
+ public void close() { }
+
+ @Override
+ public int getBatchCount() { return 0; }
+
+ @Override
+ public int getRecordCount() { return 0; }
+
+ @Override
+ public SelectionVector4 getSv4() { return null; }
+
+ @Override
+ public SelectionVector2 getSv2() { return null; }
+
+ @Override
+ public VectorContainer getContainer() { return dest; }
+ }
+
+ public SortResults startMerge() {
+ if (metrics.getInputRowCount() == 0) {
+ return new EmptyResults(outputBatch);
+ }
+
+ logger.debug("Completed load phase: read {} batches, spilled {} times, total input bytes: {}",
+ metrics.getInputBatchCount(), spilledRuns.size(),
+ metrics.getInputBytes());
+
+ // Do the merge of the loaded batches. The merge can be done entirely in memory if
+ // the results fit; else we have to do a disk-based merge of
+ // pre-sorted spilled batches.
+
+ boolean optimizeOn = true; // Debug only
+ if (optimizeOn && metrics.getInputBatchCount() == 1) {
+ return singleBatchResult();
+ } else if (canUseMemoryMerge()) {
+ return mergeInMemory();
+ } else {
+ return mergeSpilledRuns();
+ }
+ }
+
+ /**
+ * Return results for a single input batch. No merge is needed;
+ * the original (sorted) input batch is simply passed as the result.
+ * Note that this version requires replacing the operator output
+ * container with the batch container. (Vector ownership transfer
+ * was already done when accepting the input batch.)
+ */
+
+ public static class SingleBatchResults implements SortResults {
+
+ private boolean done;
+ private final BatchGroup.InputBatch batch;
+
+ public SingleBatchResults(BatchGroup.InputBatch batch) {
+ this.batch = batch;
+ }
+
+ @Override
+ public boolean next() {
+ if (done) {
+ return false;
+ }
+ done = true;
+ return true;
+ }
+
+ @Override
+ public void close() {
+ try {
+ batch.close();
+ } catch (IOException e) {
+ // Should never occur for an input batch
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public int getBatchCount() { return 1; }
+
+ @Override
+ public int getRecordCount() { return batch.getRecordCount(); }
+
+ @Override
+ public SelectionVector4 getSv4() { return null; }
+
+ @Override
+ public SelectionVector2 getSv2() { return batch.getSv2(); }
+
+ @Override
+ public VectorContainer getContainer() {return batch.getContainer(); }
+ }
+
+ /**
+ * Input consists of a single batch. Just return that batch as
+ * the output.
+ * @return results iterator over the single input batch
+ */
+
+ private SortResults singleBatchResult() {
+ List<InputBatch> batches = bufferedBatches.removeAll();
+ return new SingleBatchResults(batches.get(0));
+ }
+
+ /**
+ * All data has been read from the upstream batch. Determine if we
+ * can use a fast in-memory sort, or must use a merge (which typically,
+ * but not always, involves spilled batches.)
+ *
+ * @return whether sufficient resources exist to do an in-memory sort
+ * if all batches are still in memory
+ */
+
+ private boolean canUseMemoryMerge() {
+ if (spilledRuns.hasSpilled()) {
+ return false; }
+
+ // Do we have enough memory for MSorter (the in-memory sorter)?
+
+ if (! memManager.hasMemoryMergeCapacity(allocator.getAllocatedMemory(), MSortTemplate.memoryNeeded(metrics.getInputRowCount()))) {
+ return false; }
+
+ // Make sure we don't exceed the maximum number of batches SV4 can address.
+
+ if (bufferedBatches.size() > Character.MAX_VALUE) {
+ return false; }
+
+ // We can do an in-memory merge.
+
+ return true;
+ }
+
+ /**
+ * Perform an in-memory sort of the buffered batches. Obviously can
+ * be used only for the non-spilling case.
+ *
+ * @return DONE if no rows, OK_NEW_SCHEMA if at least one row
+ */
+
+ private SortResults mergeInMemory() {
+ logger.debug("Starting in-memory sort. Batches = {}, Records = {}, Memory = {}",
+ bufferedBatches.size(), metrics.getInputRowCount(),
+ allocator.getAllocatedMemory());
+
+ // Note the difference between how we handle batches here and in the spill/merge
+ // case. In the spill/merge case, this class decides on the batch size to send
+ // downstream. However, in the in-memory case, we must pass along all batches
+ // in a single SV4. Attempts to do paging will result in errors. In the memory
+ // merge case, the downstream Selection Vector Remover will split the one
+ // big SV4 into multiple smaller batches to send further downstream.
+
+ // If the sort fails or is empty, clean up here. Otherwise, cleanup is done
+ // by closing the resultsIterator after all results are returned downstream.
+
+ MergeSortWrapper memoryMerge = new MergeSortWrapper(context, outputBatch);
+ try {
+ memoryMerge.merge(bufferedBatches.removeAll());
+ } catch (Throwable t) {
+ memoryMerge.close();
+ throw t;
+ }
+ logger.debug("Completed in-memory sort. Memory = {}",
+ allocator.getAllocatedMemory());
+ return memoryMerge;
+ }
+
+ /**
+ * Perform merging of (typically spilled) batches. First consolidates batches
+ * as needed, then performs a final merge that is read one batch at a time
+ * to deliver batches to the downstream operator.
+ *
+ * @return an iterator over the merged batches
+ */
+
+ private SortResults mergeSpilledRuns() {
+ logger.debug("Starting consolidate phase. Batches = {}, Records = {}, Memory = {}, In-memory batches {}, spilled runs {}",
+ metrics.getInputBatchCount(), metrics.getInputRowCount(),
+ allocator.getAllocatedMemory(),
+ bufferedBatches.size(), spilledRuns.size());
+
+ // Consolidate batches to a number that can be merged in
+ // a single last pass.
+
+ loop:
+ for (;;) {
+ MergeTask task = memManager.consolidateBatches(
+ allocator.getAllocatedMemory(),
+ bufferedBatches.size(),
+ spilledRuns.size());
+ switch (task.action) {
+ case SPILL:
+ spillFromMemory();
+ break;
+ case MERGE:
+ mergeRuns(task.count);
+ break;
+ case NONE:
+ break loop;
+ default:
+ throw new IllegalStateException("Unexpected action: " + task.action);
+ }
+ }
+
+ int mergeRowCount = memManager.getMergeBatchRowCount();
+ return spilledRuns.finalMerge(bufferedBatches.removeAll(), outputBatch, mergeRowCount);
+ }
+
+ private void mergeRuns(int targetCount) {
+ long mergeMemoryPool = memManager.getMergeMemoryLimit();
+ int spillBatchRowCount = memManager.getSpillBatchRowCount();
+ spilledRuns.mergeRuns(targetCount, mergeMemoryPool, spillBatchRowCount);
+ metrics.incrMergeCount();
+ }
+
+ public void close() {
+ metrics.updateWriteBytes(spilledRuns.getWriteBytes());
+ RuntimeException ex = null;
+ try {
+ spilledRuns.close();
+ } catch (RuntimeException e) {
+ ex = e;
+ }
+ try {
+ bufferedBatches.close();
+ } catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
new file mode 100644
index 0000000..213720f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMemoryManager.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SortMemoryManager {
+
+ /**
+ * Maximum memory this operator may use. Usually comes from the
+ * operator definition, but may be overridden by a configuration
+ * parameter for unit testing.
+ */
+
+ private final long memoryLimit;
+
+ /**
+ * Estimated size of the records for this query, updated on each
+ * new batch received from upstream.
+ */
+
+ private int estimatedRowWidth;
+
+ /**
+ * Size of the merge batches that this operator produces. Generally
+ * the same as the merge batch size, unless low memory forces a smaller
+ * value.
+ */
+
+ private int expectedMergeBatchSize;
+
+ /**
+ * Estimate of the input batch size based on the largest batch seen
+ * thus far.
+ */
+ private int estimatedInputBatchSize;
+
+ /**
+ * Maximum memory level before spilling occurs. That is, we can buffer input
+ * batches in memory until we reach the level given by the buffer memory pool.
+ */
+
+ private long bufferMemoryLimit;
+
+ /**
+ * Maximum memory that can hold batches during the merge
+ * phase.
+ */
+
+ private long mergeMemoryLimit;
+
+ /**
+ * The target size for merge batches sent downstream.
+ */
+
+ private int preferredMergeBatchSize;
+
+ /**
+ * The configured size for each spill batch.
+ */
+ private int preferredSpillBatchSize;
+
+ /**
+ * Estimated number of rows that fit into a single spill batch.
+ */
+
+ private int spillBatchRowCount;
+
+ /**
+ * The estimated actual spill batch size which depends on the
+ * details of the data rows for any particular query.
+ */
+
+ private int expectedSpillBatchSize;
+
+ /**
+ * The number of records to add to each output batch sent to the
+ * downstream operator or spilled to disk.
+ */
+
+ private int mergeBatchRowCount;
+
+ private SortConfig config;
+
+ private int estimatedInputSize;
+
+ private boolean potentialOverflow;
+
+ public SortMemoryManager(SortConfig config, long memoryLimit) {
+ this.config = config;
+
+ // The maximum memory this operator can use as set by the
+ // operator definition (propagated to the allocator.)
+
+ if (config.maxMemory() > 0) {
+ this.memoryLimit = Math.min(memoryLimit, config.maxMemory());
+ } else {
+ this.memoryLimit = memoryLimit;
+ }
+
+ preferredSpillBatchSize = config.spillBatchSize();;
+ preferredMergeBatchSize = config.mergeBatchSize();
+ }
+
+ /**
+ * Update the data-driven memory use numbers including:
+ * <ul>
+ * <li>The average size of incoming records.</li>
+ * <li>The estimated spill and output batch size.</li>
+ * <li>The estimated number of average-size records per
+ * spill and output batch.</li>
+ * <li>The amount of memory set aside to hold the incoming
+ * batches before spilling starts.</li>
+ * </ul>
+ * <p>
+ * Under normal circumstances, the amount of memory available is much
+ * larger than the input, spill or merge batch sizes. The primary question
+ * is to determine how many input batches we can buffer during the load
+ * phase, and how many spill batches we can merge during the merge
+ * phase.
+ *
+ * @param batchSize the overall size of the current batch received from
+ * upstream
+ * @param batchRowWidth the average width in bytes (including overhead) of
+ * rows in the current input batch
+ * @param batchRowCount the number of actual (not filtered) records in
+ * that upstream batch
+ */
+
+ public void updateEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+
+ // The record count should never be zero, but better safe than sorry...
+
+ if (batchRowCount == 0) {
+ return; }
+
+
+ // Update input batch estimates.
+ // Go no further if nothing changed.
+
+ if (! updateInputEstimates(batchSize, batchRowWidth, batchRowCount)) {
+ return;
+ }
+
+ updateSpillSettings();
+ updateMergeSettings();
+ adjustForLowMemory();
+ logSettings(batchRowCount);
+ }
+
+ private boolean updateInputEstimates(int batchSize, int batchRowWidth, int batchRowCount) {
+
+ // The row width may end up as zero if all fields are nulls or some
+ // other unusual situation. In this case, assume a width of 10 just
+ // to avoid lots of special case code.
+
+ if (batchRowWidth == 0) {
+ batchRowWidth = 10;
+ }
+
+ // We know the batch size and number of records. Use that to estimate
+ // the average record size. Since a typical batch has many records,
+ // the average size is a fairly good estimator. Note that the batch
+ // size includes not just the actual vector data, but any unused space
+ // resulting from power-of-two allocation. This means that we don't
+ // have to do size adjustments for input batches as we will do below
+ // when estimating the size of other objects.
+
+ // Record sizes may vary across batches. To be conservative, use
+ // the largest size observed from incoming batches.
+
+ int origRowEstimate = estimatedRowWidth;
+ estimatedRowWidth = Math.max(estimatedRowWidth, batchRowWidth);
+
+ // Maintain an estimate of the incoming batch size: the largest
+ // batch yet seen. Used to reserve memory for the next incoming
+ // batch. Because we are using the actual observed batch size,
+ // the size already includes overhead due to power-of-two rounding.
+
+ long origInputBatchSize = estimatedInputBatchSize;
+ estimatedInputBatchSize = Math.max(estimatedInputBatchSize, batchSize);
+
+ // Estimate the total size of each incoming batch plus sv2. Note that, due
+ // to power-of-two rounding, the allocated sv2 size might be twice the data size.
+
+ estimatedInputSize = estimatedInputBatchSize + 4 * batchRowCount;
+
+ // Return whether anything changed.
+
+ return estimatedRowWidth != origRowEstimate || estimatedInputBatchSize != origInputBatchSize;
+ }
+
+ /**
+ * Determine the number of records to spill per spill batch. The goal is to
+ * spill batches of either 64K records, or as many records as fit into the
+ * amount of memory dedicated to each spill batch, whichever is less.
+ */
+
+ private void updateSpillSettings() {
+
+ spillBatchRowCount = rowsPerBatch(preferredSpillBatchSize);
+
+ // Compute the actual spill batch size which may be larger or smaller
+ // than the preferred size depending on the row width. Double the estimated
+ // memory needs to allow for power-of-two rounding.
+
+ expectedSpillBatchSize = batchForRows(spillBatchRowCount);
+
+ // Determine the minimum memory needed for spilling. Spilling is done just
+ // before accepting a spill batch, so we must spill if we don't have room for a
+ // (worst case) input batch. To spill, we need room for the spill batch created
+ // by merging the batches already in memory.
+
+ bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+ }
+
+ /**
+ * Determine the number of records per batch per merge step. The goal is to
+ * merge batches of either 64K records, or as many records as fit into the
+ * amount of memory dedicated to each merge batch, whichever is less.
+ */
+
+ private void updateMergeSettings() {
+
+ mergeBatchRowCount = rowsPerBatch(preferredMergeBatchSize);
+ expectedMergeBatchSize = batchForRows(mergeBatchRowCount);
+
+ // The merge memory pool assumes we can spill all input batches. The memory
+ // available to hold spill batches for merging is total memory minus the
+ // expected output batch size.
+
+ mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+ }
+
+ /**
+ * In a low-memory situation we have to approach the memory assignment
+ * problem from a different angle. Memory is low enough that we can't
+ * fit the incoming batches (of a size decided by the upstream operator)
+ * and our usual spill or merge batch sizes. Instead, we have to
+ * determine the largest spill and merge batch sizes possible given
+ * the available memory, input batch size and row width. We shrink the
+ * sizes of the batches we control to try to make things fit into limited
+ * memory. At some point, however, if we cannot fit even two input
+ * batches and even the smallest merge match, then we will run into an
+ * out-of-memory condition and we log a warning.
+ * <p>
+ * Note that these calculations are a bit crazy: it is Drill that
+ * decided to allocate the small memory, it is Drill that created the
+ * large incoming batches, and so it is Drill that created the low
+ * memory situation. Over time, a better fix for this condition is to
+ * control memory usage at the query level so that the sort is guaranteed
+ * to have sufficient memory. But, since we don't yet have the luxury
+ * of making such changes, we just live with the situation as we find
+ * it.
+ */
+
+ private void adjustForLowMemory() {
+
+ long loadHeadroom = bufferMemoryLimit - 2 * estimatedInputSize;
+ long mergeHeadroom = mergeMemoryLimit - 2 * expectedSpillBatchSize;
+ if (loadHeadroom >= 0 && mergeHeadroom >= 0) {
+ return;
+ }
+
+ lowMemorySpillBatchSize();
+ lowMemoryMergeBatchSize();
+
+ // Sanity check: if we've been given too little memory to make progress,
+ // issue a warning but proceed anyway. Should only occur if something is
+ // configured terribly wrong.
+
+ long minNeeds = 2 * estimatedInputSize + expectedSpillBatchSize;
+ if (minNeeds > memoryLimit) {
+ ExternalSortBatch.logger.warn("Potential memory overflow during load phase! " +
+ "Minimum needed = {} bytes, actual available = {} bytes",
+ minNeeds, memoryLimit);
+ bufferMemoryLimit = 0;
+ potentialOverflow = true;
+ }
+
+ // Sanity check
+
+ minNeeds = 2 * expectedSpillBatchSize + expectedMergeBatchSize;
+ if (minNeeds > memoryLimit) {
+ ExternalSortBatch.logger.warn("Potential memory overflow during merge phase! " +
+ "Minimum needed = {} bytes, actual available = {} bytes",
+ minNeeds, memoryLimit);
+ mergeMemoryLimit = 0;
+ potentialOverflow = true;
+ }
+ }
+
+ /**
+ * If we are in a low-memory condition, then we might not have room for the
+ * default spill batch size. In that case, pick a smaller size based on
+ * the observation that we need two input batches and
+ * one spill batch to make progress.
+ */
+
+ private void lowMemorySpillBatchSize() {
+
+ // The "expected" size is with power-of-two rounding in some vectors.
+ // We later work backwards to the row count assuming average internal
+ // fragmentation.
+
+ // Must hold two input batches. Use (most of) the rest for the spill batch.
+
+ expectedSpillBatchSize = (int) (memoryLimit - 2 * estimatedInputSize);
+
+ // But, in the merge phase, we need two spill batches and one output batch.
+ // (Assume that the spill and merge are equal sizes.)
+ // Use 3/4 of memory for each batch (to allow power-of-two rounding:
+
+ expectedSpillBatchSize = (int) Math.min(expectedSpillBatchSize, memoryLimit/3);
+
+ // Never going to happen, but let's ensure we don't somehow create large batches.
+
+ expectedSpillBatchSize = Math.max(expectedSpillBatchSize, SortConfig.MIN_SPILL_BATCH_SIZE);
+
+ // Must hold at least one row to spill. That is, we can make progress if we
+ // create spill files that consist of single-record batches.
+
+ expectedSpillBatchSize = Math.max(expectedSpillBatchSize, estimatedRowWidth);
+
+ // Work out the spill batch count needed by the spill code. Allow room for
+ // power-of-two rounding.
+
+ spillBatchRowCount = rowsPerBatch(expectedSpillBatchSize);
+
+ // Finally, figure out when we must spill.
+
+ bufferMemoryLimit = memoryLimit - expectedSpillBatchSize;
+ }
+
+ /**
+ * For merge batch, we must hold at least two spill batches and
+ * one output batch.
+ */
+
+ private void lowMemoryMergeBatchSize() {
+ expectedMergeBatchSize = (int) (memoryLimit - 2 * expectedSpillBatchSize);
+ expectedMergeBatchSize = Math.max(expectedMergeBatchSize, SortConfig.MIN_MERGE_BATCH_SIZE);
+ expectedMergeBatchSize = Math.max(expectedMergeBatchSize, estimatedRowWidth);
+ mergeBatchRowCount = rowsPerBatch(expectedMergeBatchSize);
+ mergeMemoryLimit = memoryLimit - expectedMergeBatchSize;
+ }
+
+ /**
+ * Log the calculated values. Turn this on if things seem amiss.
+ * Message will appear only when the values change.
+ */
+
+ private void logSettings(int actualRecordCount) {
+
+ ExternalSortBatch.logger.debug("Input Batch Estimates: record size = {} bytes; input batch = {} bytes, {} records",
+ estimatedRowWidth, estimatedInputBatchSize, actualRecordCount);
+ ExternalSortBatch.logger.debug("Merge batch size = {} bytes, {} records; spill file size: {} bytes",
+ expectedSpillBatchSize, spillBatchRowCount, config.spillFileSize());
+ ExternalSortBatch.logger.debug("Output batch size = {} bytes, {} records",
+ expectedMergeBatchSize, mergeBatchRowCount);
+ ExternalSortBatch.logger.debug("Available memory: {}, buffer memory = {}, merge memory = {}",
+ memoryLimit, bufferMemoryLimit, mergeMemoryLimit);
+ }
+
+ public enum MergeAction { SPILL, MERGE, NONE }
+
+ public static class MergeTask {
+ public MergeAction action;
+ public int count;
+
+ public MergeTask(MergeAction action, int count) {
+ this.action = action;
+ this.count = count;
+ }
+ }
+
+ public MergeTask consolidateBatches(long allocMemory, int inMemCount, int spilledRunsCount) {
+
+ // Determine additional memory needed to hold one batch from each
+ // spilled run.
+
+ // If the on-disk batches and in-memory batches need more memory than
+ // is available, spill some in-memory batches.
+
+ if (inMemCount > 0) {
+ long mergeSize = spilledRunsCount * expectedSpillBatchSize;
+ if (allocMemory + mergeSize > mergeMemoryLimit) {
+ return new MergeTask(MergeAction.SPILL, 0);
+ }
+ }
+
+ // Maximum batches that fit into available memory.
+
+ int mergeLimit = (int) ((mergeMemoryLimit - allocMemory) / expectedSpillBatchSize);
+
+ // Can't merge more than the merge limit.
+
+ mergeLimit = Math.min(mergeLimit, config.mergeLimit());
+
+ // How many batches to merge?
+
+ int mergeCount = spilledRunsCount - mergeLimit;
+ if (mergeCount <= 0) {
+ return new MergeTask(MergeAction.NONE, 0);
+ }
+
+ // We will merge. This will create yet another spilled
+ // run. Account for that.
+
+ mergeCount += 1;
+
+ // Must merge at least 2 batches to make progress.
+ // This is the the (at least one) excess plus the allowance
+ // above for the new one.
+
+ // Can't merge more than the limit.
+
+ mergeCount = Math.min(mergeCount, config.mergeLimit());
+
+ // Do the merge, then loop to try again in case not
+ // all the target batches spilled in one go.
+
+ return new MergeTask(MergeAction.MERGE, mergeCount);
+ }
+
+ /**
+ * Compute the number of rows per batch assuming that the batch is
+ * subject to average internal fragmentation due to power-of-two
+ * rounding on vectors.
+ * <p>
+ * <pre>[____|__$__]</pre>
+ * In the above, the brackets represent the whole vector. The
+ * first half is always full. When the first half filled, the second
+ * half was allocated. On average, the second half will be half full.
+ *
+ * @param batchSize expected batch size, including internal fragmentation
+ * @return number of rows that fit into the batch
+ */
+
+ private int rowsPerBatch(int batchSize) {
+ int rowCount = batchSize * 3 / 4 / estimatedRowWidth;
+ return Math.max(1, Math.min(rowCount, Character.MAX_VALUE));
+ }
+
+ /**
+ * Compute the expected number of rows that fit into a given size
+ * batch, accounting for internal fragmentation due to power-of-two
+ * rounding on vector allocations.
+ *
+ * @param rowCount the desired number of rows in the batch
+ * @return the size of resulting batch, including power-of-two
+ * rounding.
+ */
+
+ private int batchForRows(int rowCount) {
+ return estimatedRowWidth * rowCount * 4 / 3;
+ }
+
+ // Must spill if we are below the spill point (the amount of memory
+ // needed to do the minimal spill.)
+
+ public boolean isSpillNeeded(long allocatedBytes, int incomingSize) {
+ return allocatedBytes + incomingSize >= bufferMemoryLimit;
+ }
+
+ public boolean hasMemoryMergeCapacity(long allocatedBytes, long neededForInMemorySort) {
+ return (freeMemory(allocatedBytes) >= neededForInMemorySort);
+ }
+
+ public long freeMemory(long allocatedBytes) {
+ return memoryLimit - allocatedBytes;
+ }
+
+ public long getMergeMemoryLimit() { return mergeMemoryLimit; }
+ public int getSpillBatchRowCount() { return spillBatchRowCount; }
+ public int getMergeBatchRowCount() { return mergeBatchRowCount; }
+
+ // Primarily for testing
+
+ @VisibleForTesting
+ public long getMemoryLimit() { return memoryLimit; }
+ @VisibleForTesting
+ public int getRowWidth() { return estimatedRowWidth; }
+ @VisibleForTesting
+ public int getInputBatchSize() { return estimatedInputBatchSize; }
+ @VisibleForTesting
+ public int getPreferredSpillBatchSize() { return preferredSpillBatchSize; }
+ @VisibleForTesting
+ public int getPreferredMergeBatchSize() { return preferredMergeBatchSize; }
+ @VisibleForTesting
+ public int getSpillBatchSize() { return expectedSpillBatchSize; }
+ @VisibleForTesting
+ public int getMergeBatchSize() { return expectedMergeBatchSize; }
+ @VisibleForTesting
+ public long getBufferMemoryLimit() { return bufferMemoryLimit; }
+ @VisibleForTesting
+ public boolean mayOverflow() { return potentialOverflow; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
new file mode 100644
index 0000000..d51e007
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SortMetrics.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.exec.ops.OperatorStatReceiver;
+
+public class SortMetrics {
+
+ private int peakBatchCount = -1;
+ private int inputRecordCount = 0;
+ private int inputBatchCount = 0; // total number of batches received so far
+
+ /**
+ * Sum of the total number of bytes read from upstream.
+ * This is the raw memory bytes, not actual data bytes.
+ */
+
+ private long totalInputBytes;
+
+ /**
+ * Tracks the minimum amount of remaining memory for use
+ * in populating an operator metric.
+ */
+
+ private long minimumBufferSpace;
+ private OperatorStatReceiver stats;
+ private int spillCount;
+ private int mergeCount;
+ private long writeBytes;
+
+ public SortMetrics(OperatorStatReceiver stats) {
+ this.stats = stats;
+ }
+
+ public void updateInputMetrics(int rowCount, int batchSize) {
+ inputRecordCount += rowCount;
+ inputBatchCount++;
+ totalInputBytes += batchSize;
+ }
+
+ public void updateMemory(long freeMem) {
+
+ if (minimumBufferSpace == 0) {
+ minimumBufferSpace = freeMem;
+ } else {
+ minimumBufferSpace = Math.min(minimumBufferSpace, freeMem);
+ }
+ stats.setLongStat(ExternalSortBatch.Metric.MIN_BUFFER, minimumBufferSpace);
+ }
+
+ public int getInputRowCount() { return inputRecordCount; }
+ public long getInputBatchCount() { return inputBatchCount; }
+ public long getInputBytes() { return totalInputBytes; }
+
+ public void updatePeakBatches(int bufferedBatchCount) {
+ if (peakBatchCount < bufferedBatchCount) {
+ peakBatchCount = bufferedBatchCount;
+ stats.setLongStat(ExternalSortBatch.Metric.PEAK_BATCHES_IN_MEMORY, peakBatchCount);
+ }
+ }
+
+ public void incrMergeCount() {
+ stats.addLongStat(ExternalSortBatch.Metric.MERGE_COUNT, 1);
+ mergeCount++;
+ }
+
+ public void incrSpillCount() {
+ stats.addLongStat(ExternalSortBatch.Metric.SPILL_COUNT, 1);
+ spillCount++;
+ }
+
+ public void updateWriteBytes(long writeBytes) {
+ stats.setDoubleStat(ExternalSortBatch.Metric.SPILL_MB,
+ writeBytes / 1024.0D / 1024.0);
+ this.writeBytes = writeBytes;
+ }
+
+ public int getSpillCount() { return spillCount; }
+ public int getMergeCount() { return mergeCount; }
+ public long getWriteBytes() { return writeBytes; }
+ public int getPeakBatchCount() { return peakBatchCount; }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
new file mode 100644
index 0000000..4231cf4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SorterWrapper.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.config.ExternalSort;
+import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+
+/**
+ * Single-batch sorter using a generated implementation based on the
+ * schema and sort specification. The generated sorter is reused
+ * across batches. The sorter must be closed at each schema change
+ * so that the sorter will generate a new implementation against
+ * the changed schema.
+ */
+
+public class SorterWrapper extends BaseSortWrapper {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SorterWrapper.class);
+
+ /**
+ * Generated sort operation used to sort each incoming batch according to
+ * the sort criteria specified in the {@link ExternalSort} definition of
+ * this operator.
+ */
+
+ private SingleBatchSorter sorter;
+
+ public SorterWrapper(OperExecContext opContext) {
+ super(opContext);
+ }
+
+ public void sortBatch(VectorContainer convertedBatch, SelectionVector2 sv2) {
+
+ SingleBatchSorter sorter = getSorter(convertedBatch);
+ try {
+ sorter.setup(context, sv2, convertedBatch);
+ sorter.sort(sv2);
+ } catch (SchemaChangeException e) {
+ convertedBatch.clear();
+ throw UserException.unsupportedError(e)
+ .message("Unexpected schema change.")
+ .build(logger);
+ }
+ }
+
+ public void close() {
+ sorter = null;
+ }
+
+ private SingleBatchSorter getSorter(VectorAccessible batch) {
+ if (sorter == null) {
+ sorter = newSorter(batch);
+ }
+ return sorter;
+ }
+
+ private SingleBatchSorter newSorter(VectorAccessible batch) {
+ CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(
+ SingleBatchSorter.TEMPLATE_DEFINITION, context.getFunctionRegistry(),
+ context.getOptionSet());
+ ClassGenerator<SingleBatchSorter> g = cg.getRoot();
+ cg.plainJavaCapable(true);
+ // Uncomment out this line to debug the generated code.
+ cg.saveCodeForDebugging(true);
+
+ generateComparisons(g, batch, logger);
+ return getInstance(cg, logger);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
new file mode 100644
index 0000000..a6042c6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort.managed;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ops.OperExecContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
+import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.VectorContainer;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Represents the set of spilled batches, including methods to spill and/or
+ * merge a set of batches to produce a new spill file.
+ */
+
+public class SpilledRuns {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpilledRuns.class);
+
+ /**
+ * Manages the set of spill directories and files.
+ */
+
+ private final SpillSet spillSet;
+ private final LinkedList<BatchGroup.SpilledRun> spilledRuns = Lists.newLinkedList();
+
+ /**
+ * Manages the copier used to merge a collection of batches into
+ * a new set of batches.
+ */
+
+ private final PriorityQueueCopierWrapper copierHolder;
+ private BatchSchema schema;
+
+ private final OperExecContext context;
+
+ public SpilledRuns(OperExecContext opContext, SpillSet spillSet, PriorityQueueCopierWrapper copier) {
+ this.context = opContext;
+ this.spillSet = spillSet;
+// copierHolder = new PriorityQueueCopierWrapper(opContext);
+ copierHolder = copier;
+ }
+
+ public void setSchema(BatchSchema schema) {
+ this.schema = schema;
+ for (BatchGroup b : spilledRuns) {
+ b.setSchema(schema);
+ }
+ copierHolder.close();
+ }
+
+ public int size() { return spilledRuns.size(); }
+ public boolean hasSpilled() { return spillSet.hasSpilled(); }
+ public long getWriteBytes() { return spillSet.getWriteBytes(); }
+
+ public static List<BatchGroup> prepareSpillBatches(LinkedList<? extends BatchGroup> source, int spillCount) {
+ List<BatchGroup> batchesToSpill = Lists.newArrayList();
+ spillCount = Math.min(source.size(), spillCount);
+ assert spillCount > 0 : "Spill count to mergeAndSpill must not be zero";
+ for (int i = 0; i < spillCount; i++) {
+ batchesToSpill.add(source.pollFirst());
+ }
+ return batchesToSpill;
+ }
+
+ public void mergeAndSpill(List<BatchGroup> batchesToSpill, int spillBatchRowCount) {
+ spilledRuns.add(safeMergeAndSpill(batchesToSpill, spillBatchRowCount));
+ logger.trace("Completed spill: memory = {}",
+ context.getAllocator().getAllocatedMemory());
+ }
+
+ public void mergeRuns(int targetCount, long mergeMemoryPool, int spillBatchRowCount) {
+
+ long allocated = context.getAllocator().getAllocatedMemory();
+ mergeMemoryPool -= context.getAllocator().getAllocatedMemory();
+ logger.trace("Merging {} on-disk runs, alloc. memory = {}, avail. memory = {}",
+ targetCount, allocated, mergeMemoryPool);
+
+ // Determine the number of runs to merge. The count should be the
+ // target count. However, to prevent possible memory overrun, we
+ // double-check with actual spill batch size and only spill as much
+ // as fits in the merge memory pool.
+
+ int mergeCount = 0;
+ long mergeSize = 0;
+ for (SpilledRun run : spilledRuns) {
+ long batchSize = run.getBatchSize();
+ if (mergeSize + batchSize > mergeMemoryPool) {
+ break;
+ }
+ mergeSize += batchSize;
+ mergeCount++;
+ if (mergeCount == targetCount) {
+ break;
+ }
+ }
+
+ // Must always spill at least 2, even if this creates an over-size
+ // spill file. But, if this is a final consolidation, we may have only
+ // a single batch.
+
+ mergeCount = Math.max(mergeCount, 2);
+ mergeCount = Math.min(mergeCount, spilledRuns.size());
+
+ // Do the actual spill.
+
+ List<BatchGroup> batchesToSpill = prepareSpillBatches(spilledRuns, mergeCount);
+ mergeAndSpill(batchesToSpill, spillBatchRowCount);
+ }
+
+ private BatchGroup.SpilledRun safeMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) {
+ try {
+ return doMergeAndSpill(batchesToSpill, spillBatchRowCount);
+ }
+ // If error is a User Exception, just use as is.
+
+ catch (UserException ue) { throw ue; }
+ catch (Throwable ex) {
+ throw UserException.resourceError(ex)
+ .message("External Sort encountered an error while spilling to disk")
+ .build(logger);
+ }
+ }
+
+ private BatchGroup.SpilledRun doMergeAndSpill(List<? extends BatchGroup> batchesToSpill, int spillBatchRowCount) throws Throwable {
+
+ // Merge the selected set of matches and write them to the
+ // spill file. After each write, we release the memory associated
+ // with the just-written batch.
+
+ String outputFile = spillSet.getNextSpillFile();
+ BatchGroup.SpilledRun newGroup = null;
+ VectorContainer dest = new VectorContainer();
+ try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
+ PriorityQueueCopierWrapper.BatchMerger merger = copierHolder.startMerge(schema, batchesToSpill, dest, spillBatchRowCount)) {
+ newGroup = new BatchGroup.SpilledRun(spillSet, outputFile, context.getAllocator());
+ logger.trace("Spilling {} batches, into spill batches of {} rows, to {}",
+ batchesToSpill.size(), spillBatchRowCount, outputFile);
+
+ // The copier will merge records from the buffered batches into
+ // the outputContainer up to targetRecordCount number of rows.
+ // The actual count may be less if fewer records are available.
+
+ while (merger.next()) {
+
+ // Add a new batch of records (given by merger.getOutput()) to the spill
+ // file.
+ //
+ // note that addBatch also clears the merger's output container
+
+ newGroup.addBatch(dest);
+ }
+ context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class);
+ newGroup.closeOutputStream();
+ logger.trace("Spilled {} output batches, each of {} by bytes, {} records to {}",
+ merger.getBatchCount(), merger.getRecordCount(),
+ merger.getEstBatchSize(), outputFile);
+ newGroup.setBatchSize(merger.getEstBatchSize());
+ return newGroup;
+ } catch (Throwable e) {
+ // we only need to clean up newGroup if spill failed
+ try {
+ if (newGroup != null) {
+ AutoCloseables.close(e, newGroup);
+ }
+ } catch (Throwable t) { /* close() may hit the same IO issue; just ignore */ }
+
+ throw e;
+ }
+ }
+
+ public SortResults finalMerge(List<? extends BatchGroup> bufferedBatches, VectorContainer container, int mergeRowCount) {
+ List<BatchGroup> allBatches = new LinkedList<>();
+ allBatches.addAll(bufferedBatches);
+ bufferedBatches.clear();
+ allBatches.addAll(spilledRuns);
+ spilledRuns.clear();
+ logger.debug("Starting merge phase. Runs = {}, Alloc. memory = {}",
+ allBatches.size(), context.getAllocator().getAllocatedMemory());
+ return copierHolder.startMerge(schema, allBatches, container, mergeRowCount);
+ }
+
+ public void close() {
+ if (spillSet.getWriteBytes() > 0) {
+ logger.debug("End of sort. Total write bytes: {}, Total read bytes: {}",
+ spillSet.getWriteBytes(), spillSet.getWriteBytes());
+ }
+ RuntimeException ex = null;
+ try {
+ if (spilledRuns != null) {
+ BatchGroup.closeAll(spilledRuns);
+ spilledRuns.clear();
+ }
+ } catch (RuntimeException e) {
+ ex = (ex == null) ? e : ex;
+ }
+ try {
+ copierHolder.close();
+ } catch (RuntimeException e) {
+ ex = (ex == null) ? e : ex;
+ }
+ try {
+ spillSet.close();
+ } catch (RuntimeException e) {
+ ex = (ex == null) ? e : ex;
+ }
+ if (ex != null) {
+ throw ex;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/90f43bff/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 8aedaf6..146df1f 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -34,11 +34,11 @@ drill {
annotations += org.apache.drill.exec.expr.annotations.FunctionTemplate
packages : ${?drill.classpath.scanning.packages} [
- org.apache.drill.exec.expr,
- org.apache.drill.exec.physical,
- org.apache.drill.exec.store,
- org.apache.drill.exec.rpc.user.security,
- org.apache.drill.exec.rpc.security
+ org.apache.drill.exec.expr,
+ org.apache.drill.exec.physical,
+ org.apache.drill.exec.store,
+ org.apache.drill.exec.rpc.user.security,
+ org.apache.drill.exec.rpc.security
]
}
}
@@ -241,19 +241,19 @@ drill.exec: {
// Set this to true to use the legacy, unmanaged version.
// Disabled in the intial commit, to be enabled after
// tests are committed.
- disable_managed: true
+ disable_managed: true,
// Limit on the number of batches buffered in memory.
// Primarily for testing.
// 0 = unlimited
- batch_limit: 0
+ batch_limit: 0,
// Limit on the amount of memory used for xsort. Overrides the
// value provided by Foreman. Primarily for testing.
// 0 = unlimited, Supports HOCON memory suffixes.
- mem_limit: 0
+ mem_limit: 0,
// Limit on the number of spilled batches that can be merged in
// a single pass. Limits the number of open file handles.
// 0 = unlimited
- merge_limit: 0
+ merge_limit: 0,
spill: {
// Deprecated for managed xsort; used only by legacy xsort
group.size: 40000,