You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 00:19:12 UTC
[1/2] drill git commit: DRILL-5457: Spill implementation for Hash
Aggregate
Repository: drill
Updated Branches:
refs/heads/master be43a9edd -> c16e5f807
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 7cc43ad..21d5a4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -24,8 +24,8 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
@@ -40,13 +40,17 @@ public interface HashAggregator {
new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class);
public static enum AggOutcome {
- RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
+ RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN
}
+ // For returning results from outputCurrentBatch
+ // OK - batch returned, NONE - end of data, RESTART - call again
+ public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
+
public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
- OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
- VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
+ OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+ LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
+ VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
public abstract IterOutcome getOutcome();
@@ -60,6 +64,9 @@ public interface HashAggregator {
public abstract boolean buildComplete();
- public abstract IterOutcome outputCurrentBatch();
+ public abstract AggIterOutcome outputCurrentBatch();
+
+ public abstract boolean earlyOutput();
+ public abstract RecordBatch getNewIncoming();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
new file mode 100644
index 0000000..b05353e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.aggregate;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+/**
+ * A class to replace "incoming" - instead scanning a spilled partition file
+ */
+public class SpilledRecordbatch implements CloseableRecordBatch {
+ private VectorContainer container;
+ private InputStream spillStream;
+ private int spilledBatches;
+ private FragmentContext context;
+ private BatchSchema schema;
+ private OperatorContext oContext;
+ private SpillSet spillSet;
+ // Path spillStreamPath;
+ private String spillFile;
+ VectorAccessibleSerializable vas;
+
+ public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
+ this.context = context;
+ this.schema = schema;
+ this.spilledBatches = spilledBatches;
+ this.oContext = oContext;
+ this.spillSet = spillSet;
+ //this.spillStreamPath = spillStreamPath;
+ this.spillFile = spillFile;
+ vas = new VectorAccessibleSerializable(oContext.getAllocator());
+ container = vas.get();
+
+ try {
+ this.spillStream = this.spillSet.openForInput(spillFile);
+ } catch (IOException e) { throw new RuntimeException(e);}
+
+ next(); // initialize the container
+ }
+
+ @Override
+ public SelectionVector2 getSelectionVector2() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SelectionVector4 getSelectionVector4() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TypedFieldId getValueVectorId(SchemaPath path) {
+ return container.getValueVectorId(path);
+ }
+
+ @Override
+ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
+ return container.getValueAccessorById(clazz, ids);
+ }
+
+ @Override
+ public Iterator<VectorWrapper<?>> iterator() {
+ return container.iterator();
+ }
+
+ @Override
+ public FragmentContext getContext() { return context; }
+
+ @Override
+ public BatchSchema getSchema() { return schema; }
+
+ @Override
+ public WritableBatch getWritableBatch() {
+ return WritableBatch.get(this);
+ }
+
+ @Override
+ public VectorContainer getOutgoingContainer() { return container; }
+
+ @Override
+ public int getRecordCount() { return container.getRecordCount(); }
+
+ @Override
+ public void kill(boolean sendUpstream) {
+ this.close(); // delete the current spill file
+ }
+
+ /**
+ * Read the next batch from the spill file
+ *
+ * @return IterOutcome
+ */
+ @Override
+ public IterOutcome next() {
+
+ if ( spilledBatches <= 0 ) { // no more batches to read in this partition
+ this.close();
+ return IterOutcome.NONE;
+ }
+
+ if ( spillStream == null ) {
+ throw new IllegalStateException("Spill stream was null");
+ };
+
+ if ( spillSet.getPosition(spillStream) < 0 ) {
+ HashAggTemplate.logger.warn("Position is {} for stream {}", spillSet.getPosition(spillStream), spillStream.toString());
+ }
+
+ try {
+ if ( container.getNumberOfColumns() > 0 ) { // container already initialized
+ // Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable)
+ // may have a reference to this container (as an "incoming")
+ vas.readFromStreamWithContainer(container, spillStream);
+ }
+ else { // first time - create a container
+ vas.readFromStream(spillStream);
+ container = vas.get();
+ }
+ } catch (IOException e) {
+ throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger);
+ }
+
+ spilledBatches-- ; // one less batch to read
+ return IterOutcome.OK;
+ }
+
+ @Override
+ public void close() {
+ container.clear();
+ try {
+ if (spillStream != null) {
+ spillStream.close();
+ spillStream = null;
+ }
+
+ spillSet.delete(spillFile);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ spillSet.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 77ebb0d..436480e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -114,7 +114,7 @@ public class ChainedHashTable {
private HashTableConfig htConfig;
private final FragmentContext context;
private final BufferAllocator allocator;
- private final RecordBatch incomingBuild;
+ private RecordBatch incomingBuild;
private final RecordBatch incomingProbe;
private final RecordBatch outgoing;
@@ -129,14 +129,18 @@ public class ChainedHashTable {
this.outgoing = outgoing;
}
- public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
+ public void updateIncoming(RecordBatch incomingBuild) {
+ this.incomingBuild = incomingBuild;
+ }
+
+ public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException,
IOException, SchemaChangeException {
CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions());
top.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
// This code is called from generated code, so to step into this code,
// persist the code generated in HashAggBatch also.
-// top.saveCodeForDebugging(true);
+ // top.saveCodeForDebugging(true);
ClassGenerator<HashTable> cg = top.getRoot();
ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index ef7dadf..9c93c16 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.common;
import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
@@ -43,7 +44,7 @@ public interface HashTable {
*/
static final public float DEFAULT_LOAD_FACTOR = 0.75f;
- static public enum PutStatus {KEY_PRESENT, KEY_ADDED, PUT_FAILED;}
+ static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
/**
* The batch size used for internal batch holders
@@ -51,30 +52,35 @@ public interface HashTable {
static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
static final public int BATCH_MASK = 0x0000FFFF;
- /** Variable width vector size in bytes */
- public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE;
+ public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig);
- public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
- RecordBatch incomingBuild, RecordBatch incomingProbe,
- RecordBatch outgoing, VectorContainer htContainerOrig);
+ public void updateBatches() throws SchemaChangeException;
- public void updateBatches();
+ public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
- public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount);
+ public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
- public int containsKey(int incomingRowIdx, boolean isProbe);
+ public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
public void getStats(HashTableStats stats);
+ public long extraMemoryNeededForResize();
+
public int size();
public boolean isEmpty();
public void clear();
+ public void reinit(RecordBatch newIncoming);
+
+ public void reset();
+
+ public void setMaxVarcharSize(int size);
+
public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords);
- public void addNewKeyBatch();
+ // public void addNewKeyBatch();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
index c494c85..7baa9d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java
@@ -26,6 +26,13 @@ public class HashTableStats {
public HashTableStats() {
}
+
+ public void addStats (HashTableStats newStats) {
+ this.numBuckets += newStats.numBuckets ;
+ this.numEntries += newStats.numEntries ;
+ this.numResizing += newStats.numResizing ;
+ this.resizingTime += newStats.resizingTime ;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 96f9422..3209c27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -25,6 +25,7 @@ import javax.inject.Named;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
+import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -50,14 +51,19 @@ public abstract class HashTableTemplate implements HashTable {
// A hash 'bucket' consists of the start index to indicate start of a hash chain
// Array of start indexes. start index is a global index across all batch holders
+ // This is the "classic hash table", where Hash-Value % size-of-table yields
+ // the offset/position (in the startIndices) of the beginning of the hash chain.
private IntVector startIndices;
// Array of batch holders..each batch holder can hold up to BATCH_SIZE entries
private ArrayList<BatchHolder> batchHolders;
- // Size of the hash table in terms of number of buckets
+ // Current size of the hash table in terms of number of buckets
private int tableSize = 0;
+ // Original size of the hash table (needed when re-initializing)
+ private int originalTableSize;
+
// Threshold after which we rehash; It must be the tableSize * loadFactor
private int threshold;
@@ -95,6 +101,8 @@ public abstract class HashTableTemplate implements HashTable {
private int resizingTime = 0;
+ private int maxVarcharSize = 8; // for varchar allocation
+
// This class encapsulates the links, keys and values for up to BATCH_SIZE
// *unique* records. Thus, suppose there are N incoming record batches, each
// of size BATCH_SIZE..but they have M unique keys altogether, the number of
@@ -134,7 +142,9 @@ public abstract class HashTableTemplate implements HashTable {
if (vv instanceof FixedWidthVector) {
((FixedWidthVector) vv).allocateNew(BATCH_SIZE);
} else if (vv instanceof VariableWidthVector) {
- ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE);
+ long beforeMem = allocator.getAllocatedMemory();
+ ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE);
+ logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize);
} else {
vv.allocateNew();
}
@@ -166,7 +176,7 @@ public abstract class HashTableTemplate implements HashTable {
hashValues.getMutator().setValueCount(size);
}
- protected void setup() {
+ protected void setup() throws SchemaChangeException {
setupInterior(incomingBuild, incomingProbe, outgoing, htContainer);
}
@@ -175,7 +185,7 @@ public abstract class HashTableTemplate implements HashTable {
// currentIdxHolder with the index of the next link.
private boolean isKeyMatch(int incomingRowIdx,
IndexPointer currentIdxHolder,
- boolean isProbe) {
+ boolean isProbe) throws SchemaChangeException {
int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
boolean match = false;
@@ -201,7 +211,7 @@ public abstract class HashTableTemplate implements HashTable {
// Insert a new <key1, key2...keyN> entry coming from the incoming batch into the hash table
// container at the specified index
- private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) {
+ private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException {
int currentIdxWithinBatch = currentIdx & BATCH_MASK;
setValue(incomingRowIdx, currentIdxWithinBatch);
@@ -405,36 +415,34 @@ public abstract class HashTableTemplate implements HashTable {
@Named("incomingBuild") RecordBatch incomingBuild,
@Named("incomingProbe") RecordBatch incomingProbe,
@Named("outgoing") RecordBatch outgoing,
- @Named("htContainer") VectorContainer htContainer) {
+ @Named("htContainer") VectorContainer htContainer) throws SchemaChangeException {
}
@RuntimeOverridden
protected boolean isKeyMatchInternalBuild(
- @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+ @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
return false;
}
@RuntimeOverridden
protected boolean isKeyMatchInternalProbe(
- @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+ @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
return false;
}
@RuntimeOverridden
- protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+ protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException {
}
@RuntimeOverridden
- protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+ protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException {
}
} // class BatchHolder
@Override
- public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
- RecordBatch incomingBuild, RecordBatch incomingProbe,
- RecordBatch outgoing, VectorContainer htContainerOrig) {
+ public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
@@ -465,6 +473,7 @@ public abstract class HashTableTemplate implements HashTable {
if (tableSize > MAXIMUM_CAPACITY) {
tableSize = MAXIMUM_CAPACITY;
}
+ originalTableSize = tableSize ; // retain original size
threshold = (int) Math.ceil(tableSize * loadf);
@@ -476,13 +485,17 @@ public abstract class HashTableTemplate implements HashTable {
batchHolders = new ArrayList<BatchHolder>();
// First BatchHolder is created when the first put request is received.
- doSetup(incomingBuild, incomingProbe);
+ try {
+ doSetup(incomingBuild, incomingProbe);
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException("Unexpected schema change", e);
+ }
currentIdxHolder = new IndexPointer();
}
@Override
- public void updateBatches() {
+ public void updateBatches() throws SchemaChangeException {
doSetup(incomingBuild, incomingProbe);
for (BatchHolder batchHolder : batchHolders) {
batchHolder.setup();
@@ -497,6 +510,21 @@ public abstract class HashTableTemplate implements HashTable {
return numResizing;
}
+ /**
+ *
+ * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled
+ */
+ @Override
+ public long extraMemoryNeededForResize() {
+ if (tableSize == MAXIMUM_CAPACITY) { return 0; } // will not resize
+ int newSize = roundUpToPowerOf2(2 * tableSize);
+
+ if (newSize > MAXIMUM_CAPACITY) {
+ newSize = MAXIMUM_CAPACITY;
+ }
+ return newSize * 4 /* sizeof(int) */;
+ }
+
@Override
public int size() {
return numEntries;
@@ -526,7 +554,7 @@ public abstract class HashTableTemplate implements HashTable {
batchHolders = null;
}
startIndices.clear();
- currentIdxHolder = null;
+ // currentIdxHolder = null; // keep IndexPointer in case HT is reused
numEntries = 0;
}
@@ -544,86 +572,69 @@ public abstract class HashTableTemplate implements HashTable {
return rounded;
}
- @Override
- public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) {
- put(incomingRowIdx, htIdxHolder);
+ public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
+ return getHashBuild(incomingRowIdx);
}
- private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
+ /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming
+ * row into the hash table. The code selects the bucket in the startIndices, then the keys are
+ * placed into the chained list - by storing the key values into a batch, and updating its
+ * "links" member. Last it modifies the index holder to the batch offset so that the caller
+ * can store the remaining parts of the row into a matching batch (outside the hash table).
+ * Returning
+ *
+ * @param incomingRowIdx - position of the incoming row
+ * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch)
+ * @param hashCode - computed over the key(s) by calling getHashCode()
+ * @return Status - the key(s) was ADDED or was already PRESENT
+ */
+ @Override
+ public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException {
- int hash = getHashBuild(incomingRowIdx);
- int i = getBucketIndex(hash, numBuckets());
- int startIdx = startIndices.getAccessor().get(i);
+ int bucketIndex = getBucketIndex(hashCode, numBuckets());
+ int startIdx = startIndices.getAccessor().get(bucketIndex);
int currentIdx;
- int currentIdxWithinBatch;
- BatchHolder bh;
BatchHolder lastEntryBatch = null;
int lastEntryIdxWithinBatch = EMPTY_SLOT;
+ // if startIdx is non-empty, follow the hash chain links until we find a matching
+ // key or reach the end of the chain (and remember the last link there)
+ for ( currentIdxHolder.value = startIdx;
+ currentIdxHolder.value != EMPTY_SLOT;
+ /* isKeyMatch() below also advances the currentIdxHolder to the next link */) {
- if (startIdx == EMPTY_SLOT) {
- // this is the first entry in this bucket; find the first available slot in the
- // container of keys and values
- currentIdx = freeIndex++;
- addBatchIfNeeded(currentIdx);
+ // remember the current link, which would be the last when the next link is empty
+ lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
+ lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
- if (EXTRA_DEBUG) {
- logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i,
- incomingRowIdx, currentIdx);
+ if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
+ htIdxHolder.value = currentIdxHolder.value;
+ return PutStatus.KEY_PRESENT;
}
-
- insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
- // update the start index array
- startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
- htIdxHolder.value = currentIdx;
- return PutStatus.KEY_ADDED;
}
- currentIdx = startIdx;
- boolean found = false;
-
- bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
- currentIdxHolder.value = currentIdx;
-
- // if startIdx is non-empty, follow the hash chain links until we find a matching
- // key or reach the end of the chain
- while (true) {
- currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
+ // no match was found, so insert a new entry
+ currentIdx = freeIndex++;
+ boolean addedBatch = addBatchIfNeeded(currentIdx);
- if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
- htIdxHolder.value = currentIdxHolder.value;
- found = true;
- break;
- } else if (currentIdxHolder.value == EMPTY_SLOT) {
- lastEntryBatch = bh;
- lastEntryIdxWithinBatch = currentIdxWithinBatch;
- break;
- } else {
- bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK);
- lastEntryBatch = bh;
- }
+ if (EXTRA_DEBUG) {
+ logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
}
- if (!found) {
- // no match was found, so insert a new entry
- currentIdx = freeIndex++;
- addBatchIfNeeded(currentIdx);
+ insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
- if (EXTRA_DEBUG) {
- logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
- }
-
- insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch);
- htIdxHolder.value = currentIdx;
- return PutStatus.KEY_ADDED;
+ // if there was no hash chain at this bucket, need to update the start index array
+ if (startIdx == EMPTY_SLOT) {
+ startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx);
}
-
- return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
+ htIdxHolder.value = currentIdx;
+ return addedBatch ? PutStatus.NEW_BATCH_ADDED :
+ ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
+ PutStatus.KEY_ADDED_LAST : // the last key in the batch
+ PutStatus.KEY_ADDED; // otherwise
}
- private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) {
-
- addBatchIfNeeded(currentIdx);
+ private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException {
BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
@@ -640,60 +651,39 @@ public abstract class HashTableTemplate implements HashTable {
// Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
@Override
- public int containsKey(int incomingRowIdx, boolean isProbe) {
+ public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException {
int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx);
- int i = getBucketIndex(hash, numBuckets());
-
- int currentIdx = startIndices.getAccessor().get(i);
-
- if (currentIdx == EMPTY_SLOT) {
- return -1;
- }
-
- BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
- currentIdxHolder.value = currentIdx;
+ int bucketIndex = getBucketIndex(hash, numBuckets());
- boolean found = false;
-
- while (true) {
+ for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex);
+ currentIdxHolder.value != EMPTY_SLOT; ) {
+ BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) {
- found = true;
- break;
- } else if (currentIdxHolder.value == EMPTY_SLOT) {
- break;
- } else {
- bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK);
+ return currentIdxHolder.value;
}
}
-
- return found ? currentIdxHolder.value : -1;
+ return -1;
}
// Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied
// currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds
- // the capacity, we will add a new BatchHolder.
- private BatchHolder addBatchIfNeeded(int currentIdx) {
+ // the capacity, we will add a new BatchHolder. Return true if a new batch was added.
+ private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException {
int totalBatchSize = batchHolders.size() * BATCH_SIZE;
if (currentIdx >= totalBatchSize) {
- BatchHolder bh = addBatchHolder();
+ BatchHolder bh = newBatchHolder(batchHolders.size());
+ batchHolders.add(bh);
+ bh.setup();
if (EXTRA_DEBUG) {
logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size());
}
- return bh;
- } else {
- return batchHolders.get(batchHolders.size() - 1);
+ return true;
}
+ return false;
}
- private BatchHolder addBatchHolder() {
- BatchHolder bh = newBatchHolder(batchHolders.size());
- batchHolders.add(bh);
- bh.setup();
- return bh;
- }
-
- protected BatchHolder newBatchHolder(int index) {
+ protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code
return new BatchHolder(index);
}
@@ -755,6 +745,34 @@ public abstract class HashTableTemplate implements HashTable {
numResizing++;
}
+ /**
+ * Reinit the hash table to its original size, and clear up all its prior batch holder
+ *
+ */
+ public void reset() {
+ // long before = allocator.getAllocatedMemory();
+ this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
+ // long after = allocator.getAllocatedMemory();
+
+ // logger.debug("Reinit Hash Table: Memory before {} After {} Percent after: {}",before,after, (100 * after ) / before);
+
+ freeIndex = 0; // all batch holders are gone
+ // reallocate batch holders, and the hash table to the original size
+ batchHolders = new ArrayList<BatchHolder>();
+ startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT);
+ }
+ public void reinit(RecordBatch newIncoming) {
+ incomingBuild = newIncoming;
+ reset();
+ try {
+ updateBatches(); // Needed ? (to update the new incoming?)
+ } catch (SchemaChangeException e) {
+ throw new IllegalStateException("Unexpected schema change", e);
+ } catch(IndexOutOfBoundsException ioob) {
+ throw new IllegalStateException("reinit update batches", ioob);
+ }
+ }
+
@Override
public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) {
assert batchIdx < batchHolders.size();
@@ -775,17 +793,20 @@ public abstract class HashTableTemplate implements HashTable {
}
@Override
+ public void setMaxVarcharSize(int size) { maxVarcharSize = size; }
+
+/* @Override
public void addNewKeyBatch() {
int numberOfBatches = batchHolders.size();
this.addBatchHolder();
freeIndex = numberOfBatches * BATCH_SIZE;
}
-
+*/
// These methods will be code-generated in the context of the outer class
- protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
+ protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException;
- protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx);
+ protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException;
- protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx);
+ protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index e2c016b..4af1664 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -315,7 +315,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// Create the chained hash table
final ChainedHashTable ht =
new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
- hashTable = ht.createAndSetupHashTable(null);
+ hashTable = ht.createAndSetupHashTable(null, 1);
}
public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {
@@ -374,7 +374,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// For every record in the build batch , hash the key columns
for (int i = 0; i < currentRecordCount; i++) {
- hashTable.put(i, htIndex, 1 /* retry count */);
+ int hashCode = hashTable.getHashCode(i);
+ hashTable.put(i, htIndex, hashCode);
/* Use the global index returned by the hash table, to store
* the current record index and batch index. This will be used
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index 4cb2bae..a1b8169 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
/**
* Given a record batch or vector container, determines the actual memory
@@ -68,14 +69,14 @@ public class RecordBatchSizer {
public int capacity;
public int density;
public int dataSize;
+ public boolean variableWidth;
- public ColumnSize(ValueVector v) {
- metadata = v.getField();
+ public ColumnSize(ValueVector vv) {
+ metadata = vv.getField();
stdSize = TypeHelper.getSize(metadata.getType());
// Can't get size estimates if this is an empty batch.
-
- int rowCount = v.getAccessor().getValueCount();
+ int rowCount = vv.getAccessor().getValueCount();
if (rowCount == 0) {
estSize = stdSize;
return;
@@ -84,17 +85,17 @@ public class RecordBatchSizer {
// Total size taken by all vectors (and underlying buffers)
// associated with this vector.
- totalSize = v.getAllocatedByteCount();
+ totalSize = vv.getAllocatedByteCount();
// Capacity is the number of values that the vector could
// contain. This is useful only for fixed-length vectors.
- capacity = v.getValueCapacity();
+ capacity = vv.getValueCapacity();
// The amount of memory consumed by the payload: the actual
// data stored in the vectors.
- dataSize = v.getPayloadByteCount();
+ dataSize = vv.getPayloadByteCount();
// Determine "density" the number of rows compared to potential
// capacity. Low-density batches occur at block boundaries, ends
@@ -105,6 +106,7 @@ public class RecordBatchSizer {
density = roundUp(dataSize * 100, totalSize);
estSize = roundUp(dataSize, rowCount);
+ variableWidth = vv instanceof VariableWidthVector ;
}
@Override
@@ -155,6 +157,7 @@ public class RecordBatchSizer {
* vectors are partially full; prevents overestimating row width.
*/
private int netRowWidth;
+ private int netRowWidthCap50;
private boolean hasSv2;
private int sv2Size;
private int avgDensity;
@@ -167,6 +170,18 @@ public class RecordBatchSizer {
batch.getSelectionVector2() : null);
}
+ /**
+ * Maximum width of a column; used for memory estimation in case of Varchars
+ */
+ public int maxSize;
+ /**
+ * Count the nullable columns; used for memory estimation
+ */
+ public int numNullables;
+ /**
+ *
+ * @param va
+ */
public RecordBatchSizer(VectorAccessible va) {
this(va, null);
}
@@ -174,7 +189,9 @@ public class RecordBatchSizer {
public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) {
rowCount = va.getRecordCount();
for (VectorWrapper<?> vw : va) {
- measureColumn(vw);
+ int size = measureColumn(vw.getValueVector());
+ if ( size > maxSize ) { maxSize = size; }
+ if ( vw.getField().isNullable() ) { numNullables++; }
}
if (rowCount > 0) {
@@ -208,32 +225,45 @@ public class RecordBatchSizer {
totalBatchSize += sv2Size;
}
- private void measureColumn(VectorWrapper<?> vw) {
- measureColumn(vw.getValueVector());
+ /**
+ * Round up (if needed) to the next power of 2 (only up to 64)
+ * @param arg Number to round up (must be < 64)
+ * @return power of 2 result
+ */
+ private int roundUpToPowerOf2(int arg) {
+ if ( arg <= 2 ) { return 2; }
+ if ( arg <= 4 ) { return 4; }
+ if ( arg <= 8 ) { return 8; }
+ if ( arg <= 16 ) { return 16; }
+ if ( arg <= 32 ) { return 32; }
+ return 64;
}
-
- private void measureColumn(ValueVector v) {
-
+ private int measureColumn(ValueVector vv) {
// Maps consume no size themselves. However, their contained
// vectors do consume space, so visit columns recursively.
-
- if (v.getField().getType().getMinorType() == MinorType.MAP) {
- expandMap((AbstractMapVector) v);
- return;
+ if (vv.getField().getType().getMinorType() == MinorType.MAP) {
+ return expandMap((AbstractMapVector) vv);
}
- ColumnSize colSize = new ColumnSize(v);
+
+ ColumnSize colSize = new ColumnSize(vv);
columnSizes.add(colSize);
stdRowWidth += colSize.stdSize;
totalBatchSize += colSize.totalSize;
netBatchSize += colSize.dataSize;
netRowWidth += colSize.estSize;
+ netRowWidthCap50 += ! colSize.variableWidth ? colSize.estSize :
+ 8 /* offset vector */ + roundUpToPowerOf2( Math.min(colSize.estSize,50) );
+ // above change 8 to 4 after DRILL-5446 is fixed
+ return colSize.estSize;
}
- private void expandMap(AbstractMapVector mapVector) {
+ private int expandMap(AbstractMapVector mapVector) {
+ int accum = 0;
for (ValueVector vector : mapVector) {
- measureColumn(vector);
+ accum += measureColumn(vector);
}
+ return accum;
}
public static int roundUp(int num, int denom) {
@@ -247,10 +277,18 @@ public class RecordBatchSizer {
public int stdRowWidth() { return stdRowWidth; }
public int grossRowWidth() { return grossRowWidth; }
public int netRowWidth() { return netRowWidth; }
+ /**
+ * Compute the "real" width of the row, taking into account each varchar column size
+ * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation)
+ * and null marking columns.
+ * @return "real" width of the row
+ */
+ public int netRowWidthCap50() { return netRowWidthCap50 + numNullables; }
public int actualSize() { return totalBatchSize; }
public boolean hasSv2() { return hasSv2; }
public int avgDensity() { return avgDensity; }
public int netSize() { return netBatchSize; }
+ public int maxSize() { return maxSize; }
public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 74e1fb5..87eebc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -30,11 +30,13 @@ import java.util.List;
import java.util.Set;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -84,7 +86,7 @@ public class SpillSet {
* Given a manager-specific input stream, return the current read position.
* Used to report total read bytes.
*
- * @param outputStream input stream created by the file manager
+ * @param inputStream input stream created by the file manager
* @return
*/
long getReadBytes(InputStream inputStream);
@@ -346,7 +348,6 @@ public class SpillSet {
*/
private final String spillDirName;
- private final String spillFileName;
private int fileCount = 0;
@@ -356,16 +357,34 @@ public class SpillSet {
private long writeBytes;
- public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
- this(context, popConfig, null, "spill");
- }
-
- public SpillSet(FragmentContext context, PhysicalOperator popConfig,
- String opName, String fileName) {
+ public SpillSet(FragmentContext context, PhysicalOperator popConfig, UserBitShared.CoreOperatorType optype) {
FragmentHandle handle = context.getHandle();
+ String operName = "Unknown";
+
+ // Set the spill options from the configuration
DrillConfig config = context.getConfig();
- spillFileName = fileName;
- List<String> dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+ String spillFs;
+ List<String> dirList;
+
+ // Set the operator name (used as part of the spill file name),
+ // and set oper. specific options (the config file defaults to using the
+ // common options; users may override those - per operator)
+ switch (optype) {
+ case EXTERNAL_SORT:
+ operName = "Sort";
+ spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
+ dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
+ break;
+ case HASH_AGGREGATE:
+ operName = "HashAgg";
+ spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM);
+ dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS);
+ break;
+ default: // just use the common ones
+ spillFs = config.getString(ExecConstants.SPILL_FILESYSTEM);
+ dirList = config.getStringList(ExecConstants.SPILL_DIRS);
+ }
+
dirs = Iterators.cycle(dirList);
// If more than one directory, semi-randomly choose an offset into
@@ -386,23 +405,18 @@ public class SpillSet {
// system is selected and impersonation is off. (We use that
// as a proxy for a non-production Drill setup.)
- String spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM);
boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED);
if (spillFs.startsWith("file:///") && ! impersonationEnabled) {
fileManager = new LocalFileManager(spillFs);
} else {
fileManager = new HadoopFileManager(spillFs);
}
- spillDirName = String.format(
- "%s_major%d_minor%d_op%d%s",
- QueryIdHelper.getQueryId(handle.getQueryId()),
- handle.getMajorFragmentId(),
- handle.getMinorFragmentId(),
- popConfig.getOperatorId(),
- (opName == null) ? "" : "_" + opName);
+
+ spillDirName = String.format("%s_%s_%s-%s_minor%s", QueryIdHelper.getQueryId(handle.getQueryId()),
+ operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
}
- public String getNextSpillFile() {
+ public String getNextSpillFile(String extraName) {
// Identify the next directory from the round-robin list to
// the file created from this round of spilling. The directory must
@@ -411,7 +425,12 @@ public class SpillSet {
String spillDir = dirs.next();
String currSpillPath = Joiner.on("/").join(spillDir, spillDirName);
currSpillDirs.add(currSpillPath);
- String outputFile = Joiner.on("/").join(currSpillPath, spillFileName + ++fileCount);
+
+ String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount);
+ if ( extraName != null ) {
+ outputFile += "_" + extraName;
+ }
+
try {
fileManager.deleteOnExit(currSpillPath);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 69e9b4c..4d5f290 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.physical.impl.xsort.MSortTemplate;
import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch;
import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun;
+
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -399,7 +401,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
allocator = oContext.getAllocator();
opCodeGen = new OperatorCodeGenerator(context, popConfig);
- spillSet = new SpillSet(context, popConfig, "sort", "run");
+ spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT);
copierHolder = new CopierHolder(context, allocator, opCodeGen);
configure(context.getConfig());
}
@@ -1390,7 +1392,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// spill file. After each write, we release the memory associated
// with the just-written batch.
- String outputFile = spillSet.getNextSpillFile();
+ String outputFile = spillSet.getNextSpillFile(null);
stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount());
BatchGroup.SpilledRun newGroup = null;
try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill);
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 732ff15..8c69930 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -51,7 +51,7 @@ import java.util.List;
public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
- protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
+ public static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2};
protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase
protected List<NamedExpression> keys = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
index b911f6b..460ee8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java
@@ -61,6 +61,9 @@ public abstract class AggPruleBase extends Prule {
// currently won't generate a 2 phase plan.
protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) {
PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ if ( settings.isForce2phaseAggr() ) { // for testing - force 2 phase aggr
+ return true;
+ }
RelNode child = call.rel(0).getInputs().get(0);
boolean smallInput = child.getRows() < settings.getSliceTarget();
if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() || smallInput) {
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
index c382af6..09d33fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java
@@ -73,7 +73,7 @@ public class HashAggPrel extends AggPrelBase implements Prel{
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
Prel child = (Prel) this.getInput();
- HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), keys, aggExprs, 1.0f);
+ HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), operPhase, keys, aggExprs, 1.0f);
return creator.addMetadata(this, g);
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 648adb7..15314ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -133,6 +133,9 @@ public class PlannerSettings implements Context{
the need to turn off join optimization may go away.
*/
public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true);
+ // for testing purpose
+ public static final String FORCE_2PHASE_AGGR_KEY = "planner.force_2phase_aggr";
+ public static final BooleanValidator FORCE_2PHASE_AGGR = new BooleanValidator(FORCE_2PHASE_AGGR_KEY, false);
public OptionManager options = null;
public FunctionImplementationRegistry functionImplementationRegistry = null;
@@ -274,6 +277,8 @@ public class PlannerSettings implements Context{
return options.getOption(TYPE_INFERENCE);
}
+ public boolean isForce2phaseAggr() { return options.getOption(FORCE_2PHASE_AGGR);} // for testing
+
public long getInSubqueryThreshold() {
return options.getOption(IN_SUBQUERY_THRESHOLD);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 04cf8fc..0daa6b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -34,7 +34,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
* <p>
* A key thing to know is that the Iterator provided by a record batch must
* align with the rank positions of the field IDs provided using
- * {@link getValueVectorId}.
+ * {@link #getValueVectorId}.
* </p>
*/
public interface RecordBatch extends VectorAccessible {
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 8492f36..c2a4d65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -93,6 +93,10 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana
PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD,
PlannerSettings.QUOTING_IDENTIFIERS,
PlannerSettings.JOIN_OPTIMIZATION,
+ PlannerSettings.FORCE_2PHASE_AGGR, // for testing
+ ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR,
+ ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR,
+ ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning
ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
ExecConstants.OUTPUT_FORMAT_VALIDATOR,
ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index d06424e..79b49e4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.server.options.OptionManager;
public class MemoryAllocationUtilities {
@@ -40,7 +39,7 @@ public class MemoryAllocationUtilities {
* @param plan
* @param queryContext
*/
- public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+ public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
// Test plans may already have a pre-defined memory plan.
// Otherwise, determine memory allocation.
@@ -49,30 +48,30 @@ public class MemoryAllocationUtilities {
return;
}
// look for external sorts
- final List<ExternalSort> sortList = new LinkedList<>();
+ final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
for (final PhysicalOperator op : plan.getSortedOperators()) {
- if (op instanceof ExternalSort) {
- sortList.add((ExternalSort) op);
+ if ( op.isBufferedOperator() ) {
+ bufferedOpList.add(op);
}
}
// if there are any sorts, compute the maximum allocation, and set it on them
- if (sortList.size() > 0) {
+ if (bufferedOpList.size() > 0) {
final OptionManager optionManager = queryContext.getOptions();
final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
maxAllocPerNode = Math.min(maxAllocPerNode,
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
- final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
- logger.debug("Max sort alloc: {}", maxSortAlloc);
+ final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
+ logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
- for(final ExternalSort externalSort : sortList) {
+ for(final PhysicalOperator op : bufferedOpList) {
// Ensure that the sort receives the minimum memory needed to make progress.
// Without this, the math might work out to allocate too little memory.
- long alloc = Math.max(maxSortAlloc, externalSort.getInitialAllocation());
- externalSort.setMaxAllocation(alloc);
+ long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
+ op.setMaxAllocation(alloc);
}
}
plan.getProperties().hasResourcePlan = true;
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 5e5fef0..62c2307 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -433,7 +433,7 @@ public class Foreman implements Runnable {
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
- MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+ MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
//Marking endTime of Planning
queryManager.markPlanningEndTime();
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index 7ffb224..2f945d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -97,7 +97,7 @@ public class PlanSplitter {
throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType");
}
- MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext);
+ MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index c2a2bf0..8aedaf6 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -207,6 +207,33 @@ drill.exec: {
// java ... -ea -Ddrill.exec.debug.validate_vectors=true ...
validate_vectors: false
},
+ spill: {
+ // *** Options common to all the operators that may spill
+ // File system to use. Local file system by default.
+ fs: "file:///",
+ // List of directories to use. Directories are created
+ // if they do not exist.
+ directories: [ "/tmp/drill/spill" ]
+ },
+ hashagg: {
+ // An internal tuning; should not be changed
+ min_batches_per_partition: 3,
+ // An option for testing - force a memory limit
+ mem_limit: 0,
+ // The max number of partitions in each hashagg operator
+ // This number is tuned down when memory is limited
+ // Setting it to 1 means: No spilling
+ num_partitions: 32,
+ spill: {
+ // -- The 2 options below can be used to override the common ones
+ // -- (common to all spilling operators)
+ // File system to use. Local file system by default.
+ fs: ${drill.exec.spill.fs},
+ // List of directories to use. Directories are created
+ // if they do not exist.
+ directories: ${drill.exec.spill.directories},
+ }
+ },
sort: {
purge.threshold : 1000,
external: {
@@ -232,11 +259,15 @@ drill.exec: {
group.size: 40000,
// Deprecated for managed xsort; used only by legacy xsort
threshold: 40000,
+ // -- The two options below can be used to override the options common
+ // -- for all spilling operators (see "spill" above).
+ // -- This is done for backward compatibility; in the future they
+ // -- would be deprecated (you should be using only the common ones)
// File system to use. Local file system by default.
- fs: "file:///"
+ fs: ${drill.exec.spill.fs},
// List of directories to use. Directories are created
// if they do not exist.
- directories: [ "/tmp/drill/spill" ],
+ directories: ${drill.exec.spill.directories},
// Size of the batches written to, and read from, the spill files.
// Determines the ratio of memory to input data size for a single-
// generation sort. Smaller values give larger ratios, but at a
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 27df710..1a4d63b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -211,12 +211,13 @@ public class TestBugFixes extends BaseTestQuery {
int limit = 65536;
ImmutableList.Builder<Map<String, Object>> baselineBuilder = ImmutableList.builder();
for (int i = 0; i < limit; i++) {
- baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", String.valueOf(i + 1)));
+ baselineBuilder.add(Collections.<String, Object>singletonMap("`id`", /*String.valueOf */ (i + 1)));
}
List<Map<String, Object>> baseline = baselineBuilder.build();
testBuilder()
- .sqlQuery(String.format("select id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id limit %s", TEST_RES_PATH, limit))
+ .sqlQuery(String.format("select cast(id as int) as id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id order by 1 limit %s",
+ TEST_RES_PATH, limit))
.unOrdered()
.baselineRecords(baseline)
.go();
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index 66b7571..f15e757 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull;
* any particular order of execution. We ignore the results.
*/
public class TestTpchDistributedConcurrent extends BaseTestQuery {
- @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual.
+ @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // Longer timeout than usual.
/*
* Valid test names taken from TestTpchDistributed. Fuller path prefixes are
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
new file mode 100644
index 0000000..fe6fcbc
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.agg;
+
+import ch.qos.logback.classic.Level;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.FixtureBuilder;
+import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.ProfileParser;
+import org.apache.drill.test.QueryBuilder;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test spilling for the Hash Aggr operator (using the mock reader)
+ */
+public class TestHashAggrSpill extends BaseTestQuery {
+
+ private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long spilledPartitions) throws Exception {
+ String plan = client.queryBuilder().sql(sql).explainJson();
+
+ QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
+ if ( expectedRows > 0 ) {
+ assertEquals(expectedRows, summary.recordCount());
+ }
+ // System.out.println(String.format("======== \n Results: %,d records, %d batches, %,d ms\n ========", summary.recordCount(), summary.batchCount(), summary.runTimeMs() ) );
+
+ //System.out.println("Query ID: " + summary.queryIdString());
+ ProfileParser profile = client.parseProfile(summary.queryIdString());
+ //profile.print();
+ List<ProfileParser.OperatorProfile> ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE);
+
+ assertTrue( ! ops.isEmpty() );
+ // check for the first op only
+ ProfileParser.OperatorProfile hag0 = ops.get(0);
+ long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
+ assertEquals(spillCycle, opCycle);
+ long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
+ assertEquals(spilledPartitions, op_spilled_partitions);
+ /* assertEquals(3, ops.size());
+ for ( int i = 0; i < ops.size(); i++ ) {
+ ProfileParser.OperatorProfile hag = ops.get(i);
+ long cycle = hag.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
+ long num_partitions = hag.getMetric(HashAggTemplate.Metric.NUM_PARTITIONS.ordinal());
+ long spilled_partitions = hag.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
+ long mb_spilled = hag.getMetric(HashAggTemplate.Metric.SPILL_MB.ordinal());
+ System.out.println(String.format("(%d) Spill cycle: %d, num partitions: %d, spilled partitions: %d, MB spilled: %d", i,cycle, num_partitions, spilled_partitions,
+ mb_spilled));
+ } */
+ }
+
+ /**
+ * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling
+ * ("normal spill" means spill-cycle = 1 )
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHashAggrSpill() throws Exception {
+ LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+ .toConsole()
+ .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+ ;
+
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000)
+ .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
+ .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+ .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+ // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+ .maxParallelization(2)
+ .saveProfiles()
+ //.keepLocalFiles()
+ ;
+ try (LogFixture logs = logBuilder.build();
+ ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
+ runAndDump(client, sql, 1_200_000, 1, 1);
+ }
+ }
+
+ /**
+ * Test "secondary" spilling -- Some of the spilled partitions cause more spilling as they are read back
+ * (Hence spill-cycle = 2 )
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testHashAggrSecondaryTertiarySpill() throws Exception {
+ LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
+ .toConsole()
+ .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
+ .logger("org.apache.drill.exec.cache", Level.INFO)
+ ;
+
+ FixtureBuilder builder = ClusterFixture.builder()
+ .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000)
+ .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
+ .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
+ .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+ .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false)
+ // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+ .maxParallelization(1)
+ .saveProfiles()
+ //.keepLocalFiles()
+ ;
+ try (LogFixture logs = logBuilder.build();
+ ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i";
+ runAndDump(client, sql, 1_100_000, 3, 2);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
index e39a644..66588b1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.TopN;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.junit.Ignore;
import org.junit.Test;
@@ -125,7 +126,7 @@ public class BasicPhysicalOpUnitTest extends PhysicalOpUnitTestBase {
@Test
public void testSimpleHashAgg() {
- HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
+ HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f);
List<String> inputJsonBatches = Lists.newArrayList(
"[{\"a\": 5, \"b\" : 1 }]",
"[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]");
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index cb0c517..eecbdfa 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -119,6 +119,7 @@
<exclude>**/.checkstyle</exclude>
<exclude>**/.buildpath</exclude>
<exclude>**/*.json</exclude>
+ <exclude>**/*.iml</exclude>
<exclude>**/git.properties</exclude>
<exclude>**/donuts-output-data.txt</exclude>
<exclude>**/*.tbl</exclude>
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
----------------------------------------------------------------------
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index aa713f8..deed7a7 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -192,8 +192,8 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
r.pBody, r.dBodies);
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Adding message to outbound buffer. {}", outMessage);
+ logger.debug("Sending response with Sender {}", System.identityHashCode(this));
}
- logger.debug("Sending response with Sender {}", System.identityHashCode(this));
connection.getChannel().writeAndFlush(outMessage);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 105ea47..581a9f8 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -377,6 +377,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached.");
}
+ logger.trace("Reallocating VarChar, new size {}",newAllocationSize);
final DrillBuf newBuf = allocator.buffer((int)newAllocationSize);
newBuf.setBytes(0, data, 0, data.capacity());
data.release();
[2/2] drill git commit: DRILL-5457: Spill implementation for Hash
Aggregate
Posted by pr...@apache.org.
DRILL-5457: Spill implementation for Hash Aggregate
closes #822
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c16e5f80
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c16e5f80
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c16e5f80
Branch: refs/heads/master
Commit: c16e5f8072f3e5d18157767143f9ccc7669c4380
Parents: be43a9e
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Mon Jun 19 19:04:30 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Tue Jun 20 17:01:01 2017 -0700
----------------------------------------------------------------------
.../src/resources/drill-override-example.conf | 22 +
.../org/apache/drill/exec/ExecConstants.java | 22 +
.../cache/VectorAccessibleSerializable.java | 56 +
.../drill/exec/physical/base/AbstractBase.java | 28 +-
.../exec/physical/base/PhysicalOperator.java | 15 +
.../exec/physical/config/ExternalSort.java | 17 +-
.../exec/physical/config/HashAggregate.java | 25 +-
.../physical/impl/aggregate/HashAggBatch.java | 46 +-
.../impl/aggregate/HashAggTemplate.java | 1113 +++++++++++++++---
.../physical/impl/aggregate/HashAggregator.java | 19 +-
.../impl/aggregate/SpilledRecordbatch.java | 175 +++
.../physical/impl/common/ChainedHashTable.java | 10 +-
.../exec/physical/impl/common/HashTable.java | 26 +-
.../physical/impl/common/HashTableStats.java | 7 +
.../physical/impl/common/HashTableTemplate.java | 255 ++--
.../exec/physical/impl/join/HashJoinBatch.java | 5 +-
.../physical/impl/spill/RecordBatchSizer.java | 78 +-
.../exec/physical/impl/spill/SpillSet.java | 59 +-
.../impl/xsort/managed/ExternalSortBatch.java | 6 +-
.../exec/planner/physical/AggPrelBase.java | 2 +-
.../exec/planner/physical/AggPruleBase.java | 3 +
.../exec/planner/physical/HashAggPrel.java | 2 +-
.../exec/planner/physical/PlannerSettings.java | 5 +
.../apache/drill/exec/record/RecordBatch.java | 2 +-
.../server/options/SystemOptionManager.java | 4 +
.../exec/util/MemoryAllocationUtilities.java | 21 +-
.../apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../drill/exec/work/user/PlanSplitter.java | 2 +-
.../src/main/resources/drill-module.conf | 35 +-
.../java/org/apache/drill/TestBugFixes.java | 5 +-
.../drill/TestTpchDistributedConcurrent.java | 2 +-
.../physical/impl/agg/TestHashAggrSpill.java | 141 +++
.../physical/unit/BasicPhysicalOpUnitTest.java | 3 +-
exec/jdbc/pom.xml | 1 +
.../java/org/apache/drill/exec/rpc/RpcBus.java | 2 +-
.../templates/VariableLengthVectors.java | 1 +
36 files changed, 1800 insertions(+), 417 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index b9d09a8..8010f85 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -142,6 +142,13 @@ drill.exec: {
}
},
cache.hazel.subnets: ["*.*.*.*"],
+ spill: {
+ # These options are common to all spilling operators.
+ # They can be overriden, per operator (but this is just for
+ # backward compatibility, and may be deprecated in the future)
+ directories : [ "/tmp/drill/spill" ],
+ fs : "file:///"
+ }
sort: {
purge.threshold : 100,
external: {
@@ -150,11 +157,26 @@ drill.exec: {
batch.size : 4000,
group.size : 100,
threshold : 200,
+ # The 2 options below override the common ones
+ # they should be deprecated in the future
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
}
},
+ hashagg: {
+ # The partitions divide the work inside the hashagg, to ease
+ # handling spilling. This initial figure is tuned down when
+ # memory is limited.
+ # Setting this option to 1 disables spilling !
+ num_partitions: 32,
+ spill: {
+ # The 2 options below override the common ones
+ # they should be deprecated in the future
+ directories : [ "/tmp/drill/spill" ],
+ fs : "file:///"
+ }
+ },
memory: {
top.max: 1000000000000,
operator: {
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 18f69d5..537377d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -64,6 +64,12 @@ public interface ExecConstants {
String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
+ // Spill boot-time Options common to all spilling operators
+ // (Each individual operator may override the common options)
+
+ String SPILL_FILESYSTEM = "drill.exec.spill.fs";
+ String SPILL_DIRS = "drill.exec.spill.directories";
+
// External Sort Boot configuration
String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
@@ -86,6 +92,22 @@ public interface ExecConstants {
BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false);
+ // Hash Aggregate Options
+
+ String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
+ String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
+ LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling
+ String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
+ String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
+ LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0);
+ // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
+ // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
+ // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
+ String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
+ String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
+ LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3);
+ String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
+ String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 9d0182f..d569ae5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import com.codahale.metrics.MetricRegistry;
@@ -138,6 +140,60 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
va = container;
}
+ // Like above, only preserve the original container and list of value-vectors
+ public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
+ final VectorContainer container = new VectorContainer();
+ final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+ recordCount = batchDef.getRecordCount();
+ if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
+
+ if (sv2 == null) {
+ sv2 = new SelectionVector2(allocator);
+ }
+ sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+ sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
+ svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
+ final List<ValueVector> vectorList = Lists.newArrayList();
+ final List<SerializedField> fieldList = batchDef.getFieldList();
+ for (SerializedField metaData : fieldList) {
+ final int dataLength = metaData.getBufferLength();
+ final MaterializedField field = MaterializedField.create(metaData);
+ final DrillBuf buf = allocator.buffer(dataLength);
+ final ValueVector vector;
+ try {
+ buf.writeBytes(input, dataLength);
+ vector = TypeHelper.getNewVector(field, allocator);
+ vector.load(metaData, buf);
+ } finally {
+ buf.release();
+ }
+ vectorList.add(vector);
+ }
+ container.addCollection(vectorList);
+ container.setRecordCount(recordCount);
+ myContainer.transferIn(container); // transfer the vectors
+ myContainer.buildSchema(svMode);
+ myContainer.setRecordCount(recordCount);
+ /*
+ // for debugging -- show values from the first row
+ Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+ Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+ Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+ if (tmp0 != null && tmp1 != null && tmp2 != null) {
+ NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+ NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+ NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+
+ try {
+ logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0));
+ } catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); }
+ }
+ else { logger.info("HASH AGG: got nulls !!!"); }
+ */
+ va = myContainer;
+ }
+
public void writeToStreamAndRetain(OutputStream output) throws IOException {
retain = true;
writeToStream(output);
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index a547e26..6f42250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.base;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -102,17 +104,31 @@ public abstract class AbstractBase implements PhysicalOperator{
this.cost = cost;
}
- // Not available. Presumably because Drill does not currently use
- // this value, though it does appear in some test physical plans.
-// public void setMaxAllocation(long alloc) {
-// maxAllocation = alloc;
-// }
-
@Override
public long getMaxAllocation() {
return maxAllocation;
}
+ /**
+ * Any operator that supports spilling should override this method
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
+ public void setMaxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
+ }
+
+ /**
+ * Any operator that supports spilling should override this method (and return true)
+ * @return false
+ */
+ @Override @JsonIgnore
+ public boolean isBufferedOperator() { return false; }
+
+ // @Override
+ // public void setBufferedOperator(boolean bo) {}
+
@Override
public String getUserName() {
return userName;
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index b1954ca..980f32c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
*/
public long getMaxAllocation();
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ public void setMaxAllocation(long maxAllocation);
+
+ /**
+ *
+ * @return True iff this operator manages its memory (including disk spilling)
+ */
+ @JsonIgnore
+ public boolean isBufferedOperator();
+
+ // public void setBufferedOperator(boolean bo);
+
@JsonProperty("@id")
public int getOperatorId();
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 17848d0..cb9679d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -49,12 +49,19 @@ public class ExternalSort extends Sort {
return CoreOperatorType.EXTERNAL_SORT_VALUE;
}
- // Set here, rather than the base class, because this is the only
- // operator, at present, that makes use of the maximum allocation.
- // Remove this, in favor of the base class version, when Drill
- // sets the memory allocation for all operators.
-
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
public void setMaxAllocation(long maxAllocation) {
this.maxAllocation = maxAllocation;
}
+
+ /**
+ * The External Sort operator supports spilling
+ * @return true
+ */
+ @Override
+ public boolean isBufferedOperator() { return true; }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index 4dafbe8..0614dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,6 +35,7 @@ public class HashAggregate extends AbstractSingle {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class);
+ private final AggPrelBase.OperatorPhase aggPhase;
private final List<NamedExpression> groupByExprs;
private final List<NamedExpression> aggrExprs;
@@ -41,15 +43,19 @@ public class HashAggregate extends AbstractSingle {
@JsonCreator
public HashAggregate(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase,
@JsonProperty("keys") List<NamedExpression> groupByExprs,
@JsonProperty("exprs") List<NamedExpression> aggrExprs,
@JsonProperty("cardinality") float cardinality) {
super(child);
+ this.aggPhase = aggPhase;
this.groupByExprs = groupByExprs;
this.aggrExprs = aggrExprs;
this.cardinality = cardinality;
}
+ public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; }
+
public List<NamedExpression> getGroupByExprs() {
return groupByExprs;
}
@@ -69,7 +75,9 @@ public class HashAggregate extends AbstractSingle {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new HashAggregate(child, groupByExprs, aggrExprs, cardinality);
+ HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality);
+ newHAG.setMaxAllocation(getMaxAllocation());
+ return newHAG;
}
@Override
@@ -77,5 +85,18 @@ public class HashAggregate extends AbstractSingle {
return CoreOperatorType.HASH_AGGREGATE_VALUE;
}
-
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
+ public void setMaxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ }
+ /**
+ * The Hash Aggregate operator supports spilling
+ * @return true
+ */
+ @Override
+ public boolean isBufferedOperator() { return true; }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index dc913b1..97e0599 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -55,7 +56,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JVar;
@@ -63,12 +63,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
private HashAggregator aggregator;
- private final RecordBatch incoming;
+ private RecordBatch incoming;
private LogicalExpression[] aggrExprs;
private TypedFieldId[] groupByOutFieldIds;
private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch
private final List<Comparator> comparators;
private BatchSchema incomingSchema;
+ private boolean wasKilled;
private final GeneratorMapping UPDATE_AGGR_INSIDE =
GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -87,6 +88,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
super(popConfig, context);
this.incoming = incoming;
+ wasKilled = false;
final int numGrpByExprs = popConfig.getGroupByExprs().size();
comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs);
@@ -136,15 +138,36 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return IterOutcome.NONE;
}
- if (aggregator.buildComplete() && !aggregator.allFlushed()) {
- // aggregation is complete and not all records have been output yet
- return aggregator.outputCurrentBatch();
+ // if aggregation is complete and not all records have been output yet
+ if (aggregator.buildComplete() ||
+ // or: 1st phase need to return (not fully grouped) partial output due to memory pressure
+ aggregator.earlyOutput()) {
+ // then output the next batch downstream
+ HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch();
+ // if Batch returned, or end of data - then return the appropriate iter outcome
+ if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; }
+ if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; }
+ // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming
+ incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed
+ }
+
+ if (wasKilled) { // if kill() was called before, then finish up
+ aggregator.cleanup();
+ incoming.kill(false);
+ return IterOutcome.NONE;
}
- logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
+ // Read and aggregate records
+ // ( may need to run again if the spilled partition that was read
+ // generated new partitions that were all spilled )
+ AggOutcome out;
+ do {
+ //
+ // Read incoming batches and process their records
+ //
+ out = aggregator.doWork();
+ } while (out == AggOutcome.CALL_WORK_AGAIN);
- AggOutcome out = aggregator.doWork();
- logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
switch (out) {
case CLEANUP_AND_RETURN:
container.zeroVectors();
@@ -153,6 +176,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
// fall through
case RETURN_OUTCOME:
return aggregator.getOutcome();
+
case UPDATE_AGGREGATOR:
context.fail(UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged(
@@ -175,7 +199,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
* @return true if the aggregator was setup successfully. false if there was a failure.
*/
private boolean createAggregator() {
- logger.debug("Creating new aggregator.");
try {
stats.startSetup();
this.aggregator = createAggregatorInternal();
@@ -198,7 +221,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
top.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// top.saveCodeForDebugging(true);
+ // top.saveCodeForDebugging(true);
container.clear();
@@ -266,7 +289,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
agg.setup(popConfig, htConfig, context, this.stats,
- oContext.getAllocator(), incoming, this,
+ oContext, incoming, this,
aggrExprs,
cgInner.getWorkspaceTypes(),
groupByOutFieldIds,
@@ -314,6 +337,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
@Override
protected void killIncoming(boolean sendUpstream) {
+ wasKilled = true;
incoming.kill(sendUpstream);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1615200..38f0222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -18,82 +18,155 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import javax.inject.Named;
+import com.google.common.base.Stopwatch;
+
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+
+import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
+
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+
+import org.apache.drill.exec.proto.UserBitShared;
+
import org.apache.drill.exec.record.MaterializedField;
+
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.BatchSchema;
+
import org.apache.drill.exec.record.VectorContainer;
+
+import org.apache.drill.exec.record.TypedFieldId;
+
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+
import org.apache.drill.exec.vector.AllocationHelper;
+
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
+
import org.apache.drill.exec.vector.VariableWidthVector;
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
+
public abstract class HashAggTemplate implements HashAggregator {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+ protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
-// private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
-// private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
- private static final int VARIABLE_WIDTH_VALUE_SIZE = 50;
+ private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50;
+ private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
private static final boolean EXTRA_DEBUG_1 = false;
private static final boolean EXTRA_DEBUG_2 = false;
-// private static final String TOO_BIG_ERROR =
-// "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
-// private boolean newSchema = false;
+ private static final boolean EXTRA_DEBUG_SPILL = false;
+
+ // Fields needed for partitioning (the groups into partitions)
+ private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup())
+ private int partitionMask; // numPartitions - 1
+ private int bitsInMask; // number of bits in the MASK
+ private int nextPartitionToReturn = 0; // which partition to return the next batch from
+ // The following members are used for logging, metrics, etc.
+ private int rowsInPartition = 0; // counts #rows in each partition
+ private int rowsNotSpilled = 0;
+ private int rowsSpilled = 0;
+ private int rowsSpilledReturned = 0;
+ private int rowsReturnedEarly = 0;
+
+ private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
+ private boolean is2ndPhase = false;
+ private boolean canSpill = true; // make it false in case can not spill
+ private ChainedHashTable baseHashTable;
+ private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
+ private int earlyPartition = 0; // which partition to return early
+
+ private long memoryLimit; // max memory to be used by this oerator
+ private long estMaxBatchSize = 0; // used for adjusting #partitions
+ private long estRowWidth = 0;
+ private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+ private long minBatchesPerPartition; // for tuning - num partitions and spill decision
+ private long plannedBatches = 0; // account for planned, but not yet allocated batches
+
private int underlyingIndex = 0;
private int currentIndex = 0;
private IterOutcome outcome;
-// private int outputCount = 0;
private int numGroupedRecords = 0;
- private int outBatchIndex = 0;
+ private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
+
private int lastBatchOutputCount = 0;
private RecordBatch incoming;
-// private BatchSchema schema;
+ private BatchSchema schema;
private HashAggBatch outgoing;
private VectorContainer outContainer;
-// private FragmentContext context;
+
+ private FragmentContext context;
+ private OperatorContext oContext;
private BufferAllocator allocator;
-// private HashAggregate hashAggrConfig;
- private HashTable htable;
- private ArrayList<BatchHolder> batchHolders;
+ private HashTable htables[];
+ private ArrayList<BatchHolder> batchHolders[];
+ private int outBatchIndex[];
+
+ // For handling spilling
+ private SpillSet spillSet;
+ SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
+ private OutputStream outputStream[]; // an output stream for each spilled partition
+ private int spilledBatchesCount[]; // count number of batches spilled, in each partition
+ private String spillFiles[];
+ private int cycleNum = 0; // primary, secondary, tertiary, etc.
+ private int originalPartition = -1; // the partition a secondary reads from
+
+ private static class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
+
+ private ArrayList<SpilledPartition> spilledPartitionsList;
+ private int operatorId; // for the spill file name
+
private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
private IndexPointer outStartIdxHolder;
private IndexPointer outNumRecordsHolder;
private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
-
- ErrorCollector collector = new ErrorCollectorImpl();
+ private TypedFieldId[] groupByOutFieldIds;
private MaterializedField[] materializedValueFields;
private boolean allFlushed = false;
private boolean buildComplete = false;
+ private boolean handlingSpills = false; // True once starting to process spill files
private OperatorStats stats = null;
private HashTableStats htStats = new HashTableStats();
@@ -103,7 +176,15 @@ public abstract class HashAggTemplate implements HashAggregator {
NUM_BUCKETS,
NUM_ENTRIES,
NUM_RESIZING,
- RESIZING_TIME;
+ RESIZING_TIME,
+ NUM_PARTITIONS,
+ SPILLED_PARTITIONS, // number of partitions spilled to disk
+ SPILL_MB, // Number of MB of data spilled to disk. This amount is first written,
+ // then later re-read. So, disk I/O is twice this amount.
+ // For first phase aggr -- this is an estimate of the amount of data
+ // returned early (analogous to a spill in the 2nd phase).
+ SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+ ;
// duplicate for hash ag
@@ -121,7 +202,6 @@ public abstract class HashAggTemplate implements HashAggregator {
private int batchOutputCount = 0;
private int capacity = Integer.MAX_VALUE;
- private boolean allocatedNextBatch = false;
@SuppressWarnings("resource")
public BatchHolder() {
@@ -145,8 +225,8 @@ public abstract class HashAggTemplate implements HashAggregator {
if (vector instanceof FixedWidthVector) {
((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
} else if (vector instanceof VariableWidthVector) {
- ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
- HashTable.BATCH_SIZE);
+ // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap !
+ ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE);
} else if (vector instanceof ObjectVector) {
((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
} else {
@@ -166,20 +246,23 @@ public abstract class HashAggTemplate implements HashAggregator {
}
private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) {
- updateAggrValuesInternal(incomingRowIdx, idxWithinBatch);
+ try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); }
maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch);
return true;
}
private void setup() {
- setupInterior(incoming, outgoing, aggrValuesContainer);
+ try { setupInterior(incoming, outgoing, aggrValuesContainer); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
}
private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
outStartIdxHolder.value = batchOutputCount;
outNumRecordsHolder.value = 0;
for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
- outputRecordValues(i, batchOutputCount);
+ try { outputRecordValues(i, batchOutputCount); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
if (EXTRA_DEBUG_2) {
logger.debug("Outputting values to output index: {}", batchOutputCount);
}
@@ -204,24 +287,23 @@ public abstract class HashAggTemplate implements HashAggregator {
@RuntimeOverridden
public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing,
- @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) {
+ @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException {
}
@RuntimeOverridden
- public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+ public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
}
@RuntimeOverridden
- public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+ public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
}
}
-
@Override
public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
- OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
- VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException {
+ OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+ LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
+ VectorContainer outContainer) throws SchemaChangeException, IOException {
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -230,15 +312,34 @@ public abstract class HashAggTemplate implements HashAggregator {
throw new IllegalArgumentException("Wrong number of workspace variables.");
}
-// this.context = context;
+ this.context = context;
this.stats = stats;
- this.allocator = allocator;
+ this.allocator = oContext.getAllocator();
+ this.oContext = oContext;
this.incoming = incoming;
-// this.schema = incoming.getSchema();
this.outgoing = outgoing;
this.outContainer = outContainer;
+ this.operatorId = hashAggrConfig.getOperatorId();
+
+ is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
+ isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
+ canSpill = isTwoPhase; // single phase can not spill
+
+ // Typically for testing - force a spill after a partition has more than so many batches
+ minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION);
+
+ // Set the memory limit
+ memoryLimit = allocator.getLimit();
+ // Optional configured memory limit, typically used only for testing.
+ long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY);
+ if (configLimit > 0) {
+ logger.warn("Memory limit was changed to {}",configLimit);
+ memoryLimit = Math.min(memoryLimit, configLimit);
+ allocator.setLimit(memoryLimit); // enforce at the allocator
+ }
-// this.hashAggrConfig = hashAggrConfig;
+ // All the settings that require the number of partitions were moved into delayedSetup()
+ // which would be called later, after the actuall data first arrives
// currently, hash aggregation is only applicable if there are group-by expressions.
// For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
@@ -266,112 +367,278 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
- ChainedHashTable ht =
+ spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+ baseHashTable =
new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
- this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+ this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
numGroupByOutFields = groupByOutFieldIds.length;
- batchHolders = new ArrayList<BatchHolder>();
- // First BatchHolder is created when the first put request is received.
doSetup(incoming);
}
+ /**
+ * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming
+ * This data is used to compute the number of partitions.
+ */
+ private void delayedSetup() {
+
+ // Set the number of partitions from the configuration (raise to a power of two, if needed)
+ numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
+ if ( numPartitions == 1 ) {
+ canSpill = false;
+ logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1");
+ }
+ numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+
+ if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
+ else {
+ // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
+ updateEstMaxBatchSize(incoming);
+ }
+ long memAvail = memoryLimit - allocator.getAllocatedMemory();
+ if ( !canSpill ) { // single phase, or spill disabled by configuation
+ numPartitions = 1; // single phase should use only a single partition (to save memory)
+ } else { // two phase
+ // Adjust down the number of partitions if needed - when the memory available can not hold as
+ // many batches (configurable option), plus overhead (e.g. hash table, links, hash values))
+ while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) {
+ numPartitions /= 2;
+ if ( numPartitions < 2) {
+ if ( is2ndPhase ) {
+ canSpill = false; // 2nd phase needs at least 2 to make progress
+ logger.warn("Spilling was disabled - not enough memory available for internal partitioning");
+ }
+ break;
+ }
+ }
+ }
+ logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+ numPartitions, canSpill ? "Can" : "Cannot");
+
+ // The following initial safety check should be revisited once we can lower the number of rows in a batch
+ // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table)
+ if ( numPartitions == 1 ) {
+ // if too little memory - behave like the old code -- no memory limit for hash aggregate
+ allocator.setLimit(AbstractBase.MAX_ALLOCATION); // 10_000_000_000L
+ }
+ // Based on the number of partitions: Set the mask and bit count
+ partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+ bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+ // Create arrays (one entry per partition)
+ htables = new HashTable[numPartitions] ;
+ batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions] ;
+ outBatchIndex = new int[numPartitions] ;
+ outputStream = new OutputStream[numPartitions];
+ spilledBatchesCount = new int[numPartitions];
+ spillFiles = new String[numPartitions];
+ spilledPartitionsList = new ArrayList<SpilledPartition>();
+
+ plannedBatches = numPartitions; // each partition should allocate its first batch
+
+ // initialize every (per partition) entry in the arrays
+ for (int i = 0; i < numPartitions; i++ ) {
+ try {
+ this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+ this.htables[i].setMaxVarcharSize(maxColumnWidth);
+ } catch (ClassTransformationException e) {
+ throw UserException.unsupportedError(e)
+ .message("Code generation error - likely an error in the code.")
+ .build(logger);
+ } catch (IOException e) {
+ throw UserException.resourceError(e)
+ .message("IO Error while creating a hash table.")
+ .build(logger);
+ } catch (SchemaChangeException sce) {
+ throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
+ }
+ this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+ }
+ }
+ /**
+ * get new incoming: (when reading spilled files like an "incoming")
+ * @return The (newly replaced) incoming
+ */
+ @Override
+ public RecordBatch getNewIncoming() { return newIncoming; }
+
+ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException {
+ baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming
+ this.incoming = newIncoming;
+ currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
+ nextPartitionToReturn = 0;
+ for (int i = 0; i < numPartitions; i++ ) {
+ htables[i].reinit(newIncoming);
+ if ( batchHolders[i] != null) {
+ for (BatchHolder bh : batchHolders[i]) {
+ bh.clear();
+ }
+ batchHolders[i].clear();
+ batchHolders[i] = new ArrayList<BatchHolder>();
+ }
+ outBatchIndex[i] = 0;
+ outputStream[i] = null;
+ spilledBatchesCount[i] = 0;
+ spillFiles[i] = null;
+ }
+ }
+
+ /**
+ * Update the estimated max batch size to be used in the Hash Aggr Op.
+ * using the record batch size to get the row width.
+ * @param incoming
+ */
+ private void updateEstMaxBatchSize(RecordBatch incoming) {
+ if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change
+ RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+ logger.trace("Incoming sizer: {}",sizer);
+ // An empty batch only has the schema, can not tell actual length of varchars
+ // else use the actual varchars length, each capped at 50 (to match the space allocation)
+ estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+ estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+
+ // Get approx max (varchar) column width to get better memory allocation
+ maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+ maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
+
+ logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}",
+ isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+
+ if ( estMaxBatchSize > memoryLimit ) {
+ logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit);
+ }
+ }
+
+ /**
+ * Read and process (i.e., insert into the hash table and aggregate) records from the current batch.
+ * Once complete, get the incoming NEXT batch and process it as well, etc.
+ * For 1st phase, may return when an early output needs to be performed.
+ *
+ * @return Agg outcome status
+ */
@Override
public AggOutcome doWork() {
- try {
- // Note: Keeping the outer and inner try blocks here to maintain some similarity with
- // StreamingAggregate which does somethings conditionally in the outer try block.
- // In the future HashAggregate may also need to perform some actions conditionally
- // in the outer try block.
-
- outside:
- while (true) {
- // loop through existing records, aggregating the values as necessary.
- if (EXTRA_DEBUG_1) {
- logger.debug("Starting outer loop of doWork()...");
+
+ while (true) {
+
+ // This would be called only once - first time actual data arrives on incoming
+ if ( schema == null && incoming.getRecordCount() > 0 ) {
+ this.schema = incoming.getSchema();
+ currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch
+ // Calculate the number of partitions based on actual incoming data
+ delayedSetup();
+ }
+
+ //
+ // loop through existing records in this batch, aggregating the values as necessary.
+ //
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Starting outer loop of doWork()...");
+ }
+ for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
}
- for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
- if (EXTRA_DEBUG_2) {
- logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
- }
- checkGroupAndAggrValues(currentIndex);
+ checkGroupAndAggrValues(currentIndex);
+ // If adding a group discovered a memory pressure during 1st phase, then start
+ // outputing some partition downstream in order to free memory.
+ if ( earlyOutput ) {
+ outputCurrentBatch();
+ incIndex(); // next time continue with the next incoming row
+ return AggOutcome.RETURN_OUTCOME;
}
+ }
+
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Processed {} records", underlyingIndex);
+ }
- if (EXTRA_DEBUG_1) {
- logger.debug("Processed {} records", underlyingIndex);
+ // Cleanup the previous batch since we are done processing it.
+ for (VectorWrapper<?> v : incoming) {
+ v.getValueVector().clear();
+ }
+ //
+ // Get the NEXT input batch, initially from the upstream, later (if there was a spill)
+ // from one of the spill files (The spill case is handled differently here to avoid
+ // collecting stats on the spilled records)
+ //
+ if ( handlingSpills ) {
+ outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
+ } else {
+ long beforeAlloc = allocator.getAllocatedMemory();
+
+ // Get the next RecordBatch from the incoming (i.e. upstream operator)
+ outcome = outgoing.next(0, incoming);
+
+ // If incoming batch is bigger than our estimate - adjust the estimate to match
+ long afterAlloc = allocator.getAllocatedMemory();
+ long incomingBatchSize = afterAlloc - beforeAlloc;
+ if ( estMaxBatchSize < incomingBatchSize) {
+ logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize);
+ estMaxBatchSize = incomingBatchSize;
}
+ }
- try {
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received IterOutcome of {}", outcome);
+ }
- while (true) {
- // Cleanup the previous batch since we are done processing it.
- for (VectorWrapper<?> v : incoming) {
- v.getValueVector().clear();
- }
- IterOutcome out = outgoing.next(0, incoming);
- if (EXTRA_DEBUG_1) {
- logger.debug("Received IterOutcome of {}", out);
- }
- switch (out) {
- case OUT_OF_MEMORY:
- case NOT_YET:
- this.outcome = out;
- return AggOutcome.RETURN_OUTCOME;
-
- case OK_NEW_SCHEMA:
- if (EXTRA_DEBUG_1) {
- logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- }
-// newSchema = true;
- this.cleanup();
- // TODO: new schema case needs to be handled appropriately
- return AggOutcome.UPDATE_AGGREGATOR;
-
- case OK:
- resetIndex();
- if (incoming.getRecordCount() == 0) {
- continue;
- } else {
- checkGroupAndAggrValues(currentIndex);
- incIndex();
-
- if (EXTRA_DEBUG_1) {
- logger.debug("Continuing outside loop");
- }
- continue outside;
- }
-
- case NONE:
- // outcome = out;
-
- buildComplete = true;
-
- updateStats(htable);
-
- // output the first batch; remaining batches will be output
- // in response to each next() call by a downstream operator
-
- outputCurrentBatch();
-
- // return setOkAndReturn();
- return AggOutcome.RETURN_OUTCOME;
-
- case STOP:
- default:
- outcome = out;
- return AggOutcome.CLEANUP_AND_RETURN;
- }
+ // Handle various results from getting the next batch
+ switch (outcome) {
+ case OUT_OF_MEMORY:
+ case NOT_YET:
+ return AggOutcome.RETURN_OUTCOME;
+
+ case OK_NEW_SCHEMA:
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
}
+ this.cleanup();
+ // TODO: new schema case needs to be handled appropriately
+ return AggOutcome.UPDATE_AGGREGATOR;
- } finally {
- // placeholder...
- }
+ case OK:
+ currentBatchRecordCount = incoming.getRecordCount(); // size of next batch
+
+ resetIndex(); // initialize index (a new batch needs to be processed)
+
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Continue to start processing the next batch");
+ }
+ break;
+
+ case NONE:
+ resetIndex(); // initialize index (in case spill files need to be processed)
+
+ buildComplete = true;
+
+ updateStats(htables);
+
+ // output the first batch; remaining batches will be output
+ // in response to each next() call by a downstream operator
+ AggIterOutcome aggOutcome = outputCurrentBatch();
+
+ if ( aggOutcome == AggIterOutcome.AGG_RESTART ) {
+ // Output of first batch returned a RESTART (all new partitions were spilled)
+ return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
+ }
+
+ if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; }
+
+ return AggOutcome.RETURN_OUTCOME;
+
+ case STOP:
+ default:
+ return AggOutcome.CLEANUP_AND_RETURN;
}
- } finally {
}
}
+ /**
+ * Allocate space for the returned aggregate columns
+ * (Note DRILL-5588: Maybe can eliminate this allocation (and copy))
+ * @param records
+ */
private void allocateOutgoing(int records) {
// Skip the keys and only allocate for outputting the workspace values
// (keys will be output through splitAndTransfer)
@@ -382,14 +649,8 @@ public abstract class HashAggTemplate implements HashAggregator {
while (outgoingIter.hasNext()) {
@SuppressWarnings("resource")
ValueVector vv = outgoingIter.next().getValueVector();
-// MajorType type = vv.getField().getType();
- /*
- * In build schema we use the allocation model that specifies exact record count
- * so we need to stick with that allocation model until DRILL-2211 is resolved. Using
- * 50 as the average bytes per value as is used in HashTable.
- */
- AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE, 0);
+ AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
}
}
@@ -400,45 +661,82 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public int getOutputCount() {
- // return outputCount;
return lastBatchOutputCount;
}
@Override
public void cleanup() {
- if (htable != null) {
- htable.clear();
- htable = null;
+ if ( schema == null ) { return; } // not set up; nothing to clean
+ if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+ (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+ }
+ // clean (and deallocate) each partition
+ for ( int i = 0; i < numPartitions; i++) {
+ if (htables[i] != null) {
+ htables[i].clear();
+ htables[i] = null;
+ }
+ if ( batchHolders[i] != null) {
+ for (BatchHolder bh : batchHolders[i]) {
+ bh.clear();
+ }
+ batchHolders[i].clear();
+ batchHolders[i] = null;
+ }
+
+ // delete any (still active) output spill file
+ if ( outputStream[i] != null && spillFiles[i] != null) {
+ try {
+ outputStream[i].close();
+ outputStream[i] = null;
+ spillSet.delete(spillFiles[i]);
+ spillFiles[i] = null;
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
+ }
+ }
}
+ // delete any spill file left in unread spilled partitions
+ while ( ! spilledPartitionsList.isEmpty() ) {
+ SpilledPartition sp = spilledPartitionsList.remove(0);
+ try {
+ spillSet.delete(sp.spillFile);
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+ }
+ }
+ // Delete the currently handled (if any) spilled file
+ if ( newIncoming != null ) { newIncoming.close(); }
+ spillSet.close(); // delete the spill directory(ies)
htIdxHolder = null;
materializedValueFields = null;
outStartIdxHolder = null;
outNumRecordsHolder = null;
+ }
- if (batchHolders != null) {
- for (BatchHolder bh : batchHolders) {
+ // First free the memory used by the given (spilled) partition (i.e., hash table plus batches)
+ // then reallocate them in pristine state to allow the partition to continue receiving rows
+ private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ {
+ assert htables[part] != null;
+ htables[part].reset();
+ if ( batchHolders[part] != null) {
+ for (BatchHolder bh : batchHolders[part]) {
bh.clear();
}
- batchHolders.clear();
- batchHolders = null;
+ batchHolders[part].clear();
}
+ batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
}
-// private final AggOutcome setOkAndReturn() {
-// this.outcome = IterOutcome.OK;
-// for (VectorWrapper<?> v : outgoing) {
-// v.getValueVector().getMutator().setValueCount(outputCount);
-// }
-// return AggOutcome.RETURN_OUTCOME;
-// }
-
private final void incIndex() {
underlyingIndex++;
- if (underlyingIndex >= incoming.getRecordCount()) {
+ if (underlyingIndex >= currentBatchRecordCount) {
currentIndex = Integer.MAX_VALUE;
return;
}
- currentIndex = getVectorIndex(underlyingIndex);
+ try { currentIndex = getVectorIndex(underlyingIndex); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
}
private final void resetIndex() {
@@ -446,71 +744,337 @@ public abstract class HashAggTemplate implements HashAggregator {
incIndex();
}
- private void addBatchHolder() {
+ private boolean isSpilled(int part) {
+ return outputStream[part] != null;
+ }
+ /**
+ * Which partition to choose for flushing out (i.e. spill or return) ?
+ * - The current partition (to which a new bach holder is added) has a priority,
+ * because its last batch holder is full.
+ * - Also the largest prior spilled partition has some priority, as it is already spilled;
+ * but spilling too few rows (e.g. a single batch) gets us nothing.
+ * - So the largest non-spilled partition has some priority, to get more memory freed.
+ * Need to weigh the above three options.
+ *
+ * @param currPart - The partition that hit the memory limit (gets a priority)
+ * @return The partition (number) chosen to be spilled
+ */
+ private int chooseAPartitionToFlush(int currPart) {
+ if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
+ int currPartSize = batchHolders[currPart].size();
+ if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
+ // first find the largest spilled partition
+ int maxSizeSpilled = -1;
+ int indexMaxSpilled = -1;
+ for (int isp = 0; isp < numPartitions; isp++ ) {
+ if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
+ maxSizeSpilled = batchHolders[isp].size();
+ indexMaxSpilled = isp;
+ }
+ }
+ // Give the current (if already spilled) some priority
+ if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+ maxSizeSpilled = currPartSize ;
+ indexMaxSpilled = currPart;
+ }
+ // now find the largest non-spilled partition
+ int maxSize = -1;
+ int indexMax = -1;
+ // Use the largest spilled (if found) as a base line, with a factor of 4
+ if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
+ indexMax = indexMaxSpilled;
+ maxSize = 4 * maxSizeSpilled ;
+ }
+ for ( int insp = 0; insp < numPartitions; insp++) {
+ if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
+ indexMax = insp;
+ maxSize = batchHolders[insp].size();
+ }
+ }
+ // again - priority to the current partition
+ if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+ return currPart;
+ }
+ if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
+ return -1; // try skipping this spill
+ }
+ return indexMax;
+ }
+
+ /**
+ * Iterate through the batches of the given partition, writing them to a file
+ *
+ * @param part The partition (number) to spill
+ */
+ private void spillAPartition(int part) {
+
+ ArrayList<BatchHolder> currPartition = batchHolders[part];
+ rowsInPartition = 0;
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size());
+ }
+
+ if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
+
+ // If this is the first spill for this partition, create an output stream
+ if ( ! isSpilled(part) ) {
+
+ spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
+
+ try {
+ outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+ } catch (IOException ioe) {
+ throw UserException.resourceError(ioe)
+ .message("Hash Aggregation failed to open spill file: " + spillFiles[part])
+ .build(logger);
+ }
+ }
+
+ for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) {
+
+ // get the number of records in the batch holder that are pending output
+ int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+ rowsInPartition += numPendingOutput; // for logging
+ rowsSpilled += numPendingOutput;
+
+ allocateOutgoing(numPendingOutput);
+
+ currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+ int numOutputRecords = outNumRecordsHolder.value;
+
+ this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+ // set the value count for outgoing batch value vectors
+ /* int i = 0; */
+ for (VectorWrapper<?> v : outgoing) {
+ v.getValueVector().getMutator().setValueCount(numOutputRecords);
+ /*
+ // print out the first row to be spilled ( varchar, varchar, bigint )
+ try {
+ if (i++ < 2) {
+ NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector());
+ logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+ } else {
+ NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector());
+ logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+ }
+ } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); }
+ */
+ }
+
+ outContainer.setRecordCount(numPendingOutput);
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
+ VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
+ Stopwatch watch = Stopwatch.createStarted();
+ try {
+ outputBatch.writeToStream(outputStream[part]);
+ } catch (IOException ioe) {
+ throw UserException.dataWriteError(ioe)
+ .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString())
+ .build(logger);
+ }
+ outContainer.zeroVectors();
+ logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+ }
+
+ spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
+
+ logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part);
+ }
+
+ private void addBatchHolder(int part) {
+
BatchHolder bh = newBatchHolder();
- batchHolders.add(bh);
+ batchHolders[part].add(bh);
if (EXTRA_DEBUG_1) {
- logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+ logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
}
bh.setup();
}
- // Overridden in the generated class when created as plain Java code.
-
+ // These methods are overridden in the generated class when created as plain Java code.
protected BatchHolder newBatchHolder() {
return new BatchHolder();
}
+ /**
+ * Output the next batch from partition "nextPartitionToReturn"
+ *
+ * @return iteration outcome (e.g., OK, NONE ...)
+ */
@Override
- public IterOutcome outputCurrentBatch() {
- if (outBatchIndex >= batchHolders.size()) {
- this.outcome = IterOutcome.NONE;
- return outcome;
+ public AggIterOutcome outputCurrentBatch() {
+
+ // when incoming was an empty batch, just finish up
+ if ( schema == null ) {
+ logger.trace("Incoming was empty; output is an empty batch.");
+ this.outcome = IterOutcome.NONE; // no records were read
+ allFlushed = true;
+ return AggIterOutcome.AGG_NONE;
}
- // get the number of records in the batch holder that are pending output
- int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
+ // Initialization (covers the case of early output)
+ ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
+ int currOutBatchIndex = outBatchIndex[earlyPartition];
+ int partitionToReturn = earlyPartition;
+
+ if ( ! earlyOutput ) {
+ // Update the next partition to return (if needed)
+ // skip fully returned (or spilled) partitions
+ while (nextPartitionToReturn < numPartitions) {
+ //
+ // If this partition was spilled - spill the rest of it and skip it
+ //
+ if ( isSpilled(nextPartitionToReturn) ) {
+ spillAPartition(nextPartitionToReturn); // spill the rest
+ SpilledPartition sp = new SpilledPartition();
+ sp.spillFile = spillFiles[nextPartitionToReturn];
+ sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
+ sp.cycleNum = cycleNum; // remember the current cycle
+ sp.origPartn = nextPartitionToReturn; // for debugging / filename
+ sp.prevOrigPartn = originalPartition; // for debugging / filename
+ spilledPartitionsList.add(sp);
+
+ reinitPartition(nextPartitionToReturn); // free the memory
+ long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
+ spillSet.tallyWriteBytes(posn); // for the IO stats
+ try {
+ outputStream[nextPartitionToReturn].close();
+ } catch (IOException ioe) {
+ throw UserException.resourceError(ioe)
+ .message("IO Error while closing output stream")
+ .build(logger);
+ }
+ outputStream[nextPartitionToReturn] = null;
+ }
+ else {
+ currPartition = batchHolders[nextPartitionToReturn];
+ currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
+ // If curr batch (partition X index) is not empty - proceed to return it
+ if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
+ break;
+ }
+ }
+ nextPartitionToReturn++; // else check next partition
+ }
+
+ // if passed the last partition - either done or need to restart and read spilled partitions
+ if (nextPartitionToReturn >= numPartitions) {
+ // The following "if" is probably never used; due to a similar check at the end of this method
+ if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+ allFlushed = true;
+ this.outcome = IterOutcome.NONE;
+ if ( is2ndPhase ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+ (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+ }
+ return AggIterOutcome.AGG_NONE; // then return NONE
+ }
+ // Else - there are still spilled partitions to process - pick one and handle just like a new incoming
+ buildComplete = false; // go back and call doWork() again
+ handlingSpills = true; // beginning to work on the spill files
+ // pick a spilled partition; set a new incoming ...
+ SpilledPartition sp = spilledPartitionsList.remove(0);
+ // Create a new "incoming" out of the spilled partition spill file
+ newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet);
+ originalPartition = sp.origPartn; // used for the filename
+ logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
+ // Initialize .... new incoming, new set of partitions
+ try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e); }
+ // update the cycle num if needed
+ // The current cycle num should always be one larger than in the spilled partition
+ if ( cycleNum == sp.cycleNum ) {
+ cycleNum = 1 + sp.cycleNum;
+ stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
+ // report first spill or memory stressful situations
+ if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); }
+ if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
+ if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
+ if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
+ if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
+ }
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.",
+ sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size());
+ }
+ return AggIterOutcome.AGG_RESTART;
+ }
+
+ partitionToReturn = nextPartitionToReturn ;
- if (numPendingOutput == 0) {
- this.outcome = IterOutcome.NONE;
- return outcome;
}
+ // get the number of records in the batch holder that are pending output
+ int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+ // The following accounting is for logging, metrics, etc.
+ rowsInPartition += numPendingOutput ;
+ if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
+ else { rowsSpilledReturned += numPendingOutput; }
+ if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
+
allocateOutgoing(numPendingOutput);
- batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+ currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
int numOutputRecords = outNumRecordsHolder.value;
if (EXTRA_DEBUG_1) {
logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
}
- this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+ this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
// set the value count for outgoing batch value vectors
for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(numOutputRecords);
}
-// outputCount += numOutputRecords;
-
this.outcome = IterOutcome.OK;
- logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
+ if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+ logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
+ rowsNotSpilled+rowsSpilledReturned,
+ rowsSpilled);
+ }
lastBatchOutputCount = numOutputRecords;
- outBatchIndex++;
- if (outBatchIndex == batchHolders.size()) {
- allFlushed = true;
+ outBatchIndex[partitionToReturn]++;
+ // if just flushed the last batch in the partition
+ if (outBatchIndex[partitionToReturn] == currPartition.size()) {
+
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
+ earlyOutput ? "(Early)" : "",
+ partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
+ }
+ rowsInPartition = 0; // reset to count for the next partition
+
+ // deallocate memory used by this partition, and re-initialize
+ reinitPartition(partitionToReturn);
- logger.debug("HashAggregate: All batches flushed.");
+ if ( earlyOutput ) {
- // cleanup my internal state since there is nothing more to return
- this.cleanup();
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
+ }
+ outBatchIndex[earlyPartition] = 0; // reset, for next time
+ earlyOutput = false ; // done with early output
+ }
+ else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ?
+
+ allFlushed = true; // next next() call will return NONE
+
+ logger.trace("HashAggregate: All batches flushed.");
+
+ // cleanup my internal state since there is nothing more to return
+ this.cleanup();
+ }
}
- return this.outcome;
+ return AggIterOutcome.AGG_OK;
}
@Override
@@ -522,11 +1086,33 @@ public abstract class HashAggTemplate implements HashAggregator {
public boolean buildComplete() {
return buildComplete;
}
+ @Override
+ public boolean earlyOutput() { return earlyOutput; }
public int numGroupedRecords() {
return numGroupedRecords;
}
+ /**
+ * Generate a detailed error message in case of "Out Of Memory"
+ * @return err msg
+ */
+ private String getOOMErrorMsg() {
+ String errmsg;
+ if ( !isTwoPhase ) {
+ errmsg = "Single Phase Hash Aggregate operator can not spill." ;
+ } else if ( ! canSpill ) { // 2nd phase, with only 1 partition
+ errmsg = "Too little memory available to operator to facilitate spilling.";
+ } else { // a bug ?
+ errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
+ ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
+ if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
+ }
+ errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
+
+ return errmsg;
+ }
+
// Check if a group is present in the hash table; if not, insert it in the hash table.
// The htIdxHolder contains the index of the group in the hash table container; this same
// index is also used for the aggregation values maintained by the hash aggregate.
@@ -535,6 +1121,8 @@ public abstract class HashAggTemplate implements HashAggregator {
throw new IllegalArgumentException("Invalid incoming row index.");
}
+ assert ! earlyOutput;
+
/** for debugging
Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
BigIntVector vv0 = null;
@@ -546,44 +1134,189 @@ public abstract class HashAggTemplate implements HashAggregator {
holder.value = vv0.getAccessor().get(incomingRowIdx) ;
}
*/
+ /*
+ if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
+ // for debugging -- show the first row from a spilled batch
+ Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+ Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+ Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+
+ if (tmp0 != null && tmp1 != null && tmp2 != null) {
+ NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+ NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+ NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+ logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
+ }
+ }
+ */
+ // The hash code is computed once, then its lower bits are used to determine the
+ // partition to use, and the higher bits determine the location in the hash table.
+ int hashCode;
+ try {
+ htables[0].updateBatches();
+ hashCode = htables[0].getHashCode(incomingRowIdx);
+ } catch (SchemaChangeException e) {
+ throw new UnsupportedOperationException("Unexpected schema change", e);
+ }
- htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
+ // right shift hash code for secondary (or tertiary...) spilling
+ for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
- int currentIdx = htIdxHolder.value;
+ int currentPartition = hashCode & partitionMask ;
+ hashCode >>>= bitsInMask;
+ HashTable.PutStatus putStatus = null;
+ long allocatedBefore = allocator.getAllocatedMemory();
- // get the batch index and index within the batch
- if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
- addBatchHolder();
+ // Insert the key columns into the hash table
+ try {
+ putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+ } catch (OutOfMemoryException exc) {
+ throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+ } catch (SchemaChangeException e) {
+ throw new UnsupportedOperationException("Unexpected schema change", e);
}
- BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
- int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
+ int currentIdx = htIdxHolder.value;
- // Check if we have almost filled up the workspace vectors and add a batch if necessary
- if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
- htable.addNewKeyBatch();
- addBatchHolder();
- bh.allocatedNextBatch = true;
+ long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
+ if ( addedMem > 0 ) {
+ logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition);
}
+ // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch
+ // (for the aggregate columns) needs to be created
+ if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
+ try {
+ long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+
+ addBatchHolder(currentPartition);
+
+ if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
+ long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
+ logger.trace("MEMORY CHECK AGG: added {} total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
+ // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
+ if ( totalAddedMem > estMaxBatchSize ) {
+ logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
+ estMaxBatchSize = totalAddedMem;
+ }
+ } catch (OutOfMemoryException exc) {
+ throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+ }
+ }
+ BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK);
+ int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
numGroupedRecords++;
}
+
+ // ===================================================================================
+ // If the last batch just became full - that is the time to check the memory limits !!
+ // If exceeded, then need to spill (if 2nd phase) or output early (1st)
+ // (Skip this if cannot spill; in such case an OOM may be encountered later)
+ // ===================================================================================
+ if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) {
+
+ plannedBatches++; // planning to allocate one more batch
+
+ // calculate the (max) new memory needed now
+ long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize
+ for ( HashTable ht : htables ) {
+ hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize();
+ }
+
+ // Plan ahead for at least MIN batches, to account for size changing, and some overhead
+ long maxMemoryNeeded = minBatchesPerPartition * plannedBatches *
+ ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) +
+ hashTableDoublingSizeNeeded;
+
+ // log a detailed debug message explaining why a spill may be needed
+ logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
+ "Memory needed {}, Est batch size {}, mem limit {}",
+ allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
+ batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
+ //
+ // Spill if the allocated memory plus the memory needed exceeds the memory limit.
+ //
+ if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+
+ // Pick a "victim" partition to spill or return
+ int victimPartition = chooseAPartitionToFlush(currentPartition);
+
+ // In case no partition has more than one batch -- try and "push the limits"; maybe next
+ // time the spill could work.
+ if ( victimPartition < 0 ) { return; }
+
+ if ( is2ndPhase ) {
+ long before = allocator.getAllocatedMemory();
+
+ spillAPartition(victimPartition);
+ logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
+
+ // Re-initialize (free memory, then recreate) the partition just spilled/returned
+ reinitPartition(victimPartition);
+
+ // in some "edge" cases (e.g. testing), spilling one partition may not be enough
+ if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+ int victimPartition2 = chooseAPartitionToFlush(victimPartition);
+ if ( victimPartition2 < 0 ) { return; }
+ long after = allocator.getAllocatedMemory();
+ spillAPartition(victimPartition2);
+ reinitPartition(victimPartition2);
+ logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
+ before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
+ logger.trace("Second Partition Spilled: {}",victimPartition2);
+ }
+ }
+ else {
+ // 1st phase need to return a partition early in order to free some memory
+ earlyOutput = true;
+ earlyPartition = victimPartition;
+
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("picked partition {} for early output", victimPartition);
+ }
+ }
+ }
+ }
}
- private void updateStats(HashTable htable) {
- htable.getStats(htStats);
+ /**
+ * Updates the stats at the time after all the input was read.
+ * Note: For spilled partitions, their hash-table stats from before the spill are lost.
+ * And the SPILLED_PARTITIONS only counts the spilled partitions in the primary, not SECONDARY etc.
+ * @param htables
+ */
+ private void updateStats(HashTable[] htables) {
+ if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+ long numSpilled = 0;
+ HashTableStats newStats = new HashTableStats();
+ // sum the stats from all the partitions
+ for (int ind = 0; ind < numPartitions; ind++) {
+ htables[ind].getStats(newStats);
+ htStats.addStats(newStats);
+ if (isSpilled(ind)) {
+ numSpilled++;
+ }
+ }
this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+ this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+ if ( is2ndPhase ) {
+ this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+ }
+ if ( rowsReturnedEarly > 0 ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
+ (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
+ }
}
// Code-generated methods (implemented in HashAggBatch)
- public abstract void doSetup(@Named("incoming") RecordBatch incoming);
+ public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
- public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+ public abstract int getVectorIndex(@Named("recordIndex") int recordIndex) throws SchemaChangeException;
- public abstract boolean resetValues();
+ public abstract boolean resetValues() throws SchemaChangeException;
}