You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/06/21 00:19:13 UTC
[2/2] drill git commit: DRILL-5457: Spill implementation for Hash
Aggregate
DRILL-5457: Spill implementation for Hash Aggregate
closes #822
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c16e5f80
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c16e5f80
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c16e5f80
Branch: refs/heads/master
Commit: c16e5f8072f3e5d18157767143f9ccc7669c4380
Parents: be43a9e
Author: Boaz Ben-Zvi <bo...@BBenZvi-E754-MBP13.local>
Authored: Mon Jun 19 19:04:30 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Tue Jun 20 17:01:01 2017 -0700
----------------------------------------------------------------------
.../src/resources/drill-override-example.conf | 22 +
.../org/apache/drill/exec/ExecConstants.java | 22 +
.../cache/VectorAccessibleSerializable.java | 56 +
.../drill/exec/physical/base/AbstractBase.java | 28 +-
.../exec/physical/base/PhysicalOperator.java | 15 +
.../exec/physical/config/ExternalSort.java | 17 +-
.../exec/physical/config/HashAggregate.java | 25 +-
.../physical/impl/aggregate/HashAggBatch.java | 46 +-
.../impl/aggregate/HashAggTemplate.java | 1113 +++++++++++++++---
.../physical/impl/aggregate/HashAggregator.java | 19 +-
.../impl/aggregate/SpilledRecordbatch.java | 175 +++
.../physical/impl/common/ChainedHashTable.java | 10 +-
.../exec/physical/impl/common/HashTable.java | 26 +-
.../physical/impl/common/HashTableStats.java | 7 +
.../physical/impl/common/HashTableTemplate.java | 255 ++--
.../exec/physical/impl/join/HashJoinBatch.java | 5 +-
.../physical/impl/spill/RecordBatchSizer.java | 78 +-
.../exec/physical/impl/spill/SpillSet.java | 59 +-
.../impl/xsort/managed/ExternalSortBatch.java | 6 +-
.../exec/planner/physical/AggPrelBase.java | 2 +-
.../exec/planner/physical/AggPruleBase.java | 3 +
.../exec/planner/physical/HashAggPrel.java | 2 +-
.../exec/planner/physical/PlannerSettings.java | 5 +
.../apache/drill/exec/record/RecordBatch.java | 2 +-
.../server/options/SystemOptionManager.java | 4 +
.../exec/util/MemoryAllocationUtilities.java | 21 +-
.../apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../drill/exec/work/user/PlanSplitter.java | 2 +-
.../src/main/resources/drill-module.conf | 35 +-
.../java/org/apache/drill/TestBugFixes.java | 5 +-
.../drill/TestTpchDistributedConcurrent.java | 2 +-
.../physical/impl/agg/TestHashAggrSpill.java | 141 +++
.../physical/unit/BasicPhysicalOpUnitTest.java | 3 +-
exec/jdbc/pom.xml | 1 +
.../java/org/apache/drill/exec/rpc/RpcBus.java | 2 +-
.../templates/VariableLengthVectors.java | 1 +
36 files changed, 1800 insertions(+), 417 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/distribution/src/resources/drill-override-example.conf
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index b9d09a8..8010f85 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -142,6 +142,13 @@ drill.exec: {
}
},
cache.hazel.subnets: ["*.*.*.*"],
+ spill: {
+ # These options are common to all spilling operators.
+ # They can be overriden, per operator (but this is just for
+ # backward compatibility, and may be deprecated in the future)
+ directories : [ "/tmp/drill/spill" ],
+ fs : "file:///"
+ }
sort: {
purge.threshold : 100,
external: {
@@ -150,11 +157,26 @@ drill.exec: {
batch.size : 4000,
group.size : 100,
threshold : 200,
+ # The 2 options below override the common ones
+ # they should be deprecated in the future
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
}
},
+ hashagg: {
+ # The partitions divide the work inside the hashagg, to ease
+ # handling spilling. This initial figure is tuned down when
+ # memory is limited.
+ # Setting this option to 1 disables spilling !
+ num_partitions: 32,
+ spill: {
+ # The 2 options below override the common ones
+ # they should be deprecated in the future
+ directories : [ "/tmp/drill/spill" ],
+ fs : "file:///"
+ }
+ },
memory: {
top.max: 1000000000000,
operator: {
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 18f69d5..537377d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -64,6 +64,12 @@ public interface ExecConstants {
String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";
+ // Spill boot-time Options common to all spilling operators
+ // (Each individual operator may override the common options)
+
+ String SPILL_FILESYSTEM = "drill.exec.spill.fs";
+ String SPILL_DIRS = "drill.exec.spill.directories";
+
// External Sort Boot configuration
String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
@@ -86,6 +92,22 @@ public interface ExecConstants {
BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false);
+ // Hash Aggregate Options
+
+ String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
+ String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
+ LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling
+ String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
+ String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
+ LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0);
+ // min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
+ // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
+ // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
+ String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
+ String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
+ LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3);
+ String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
+ String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 9d0182f..d569ae5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import com.codahale.metrics.MetricRegistry;
@@ -138,6 +140,60 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
va = container;
}
+ // Like above, only preserve the original container and list of value-vectors
+ public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
+ final VectorContainer container = new VectorContainer();
+ final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
+ recordCount = batchDef.getRecordCount();
+ if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {
+
+ if (sv2 == null) {
+ sv2 = new SelectionVector2(allocator);
+ }
+ sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
+ sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
+ svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
+ }
+ final List<ValueVector> vectorList = Lists.newArrayList();
+ final List<SerializedField> fieldList = batchDef.getFieldList();
+ for (SerializedField metaData : fieldList) {
+ final int dataLength = metaData.getBufferLength();
+ final MaterializedField field = MaterializedField.create(metaData);
+ final DrillBuf buf = allocator.buffer(dataLength);
+ final ValueVector vector;
+ try {
+ buf.writeBytes(input, dataLength);
+ vector = TypeHelper.getNewVector(field, allocator);
+ vector.load(metaData, buf);
+ } finally {
+ buf.release();
+ }
+ vectorList.add(vector);
+ }
+ container.addCollection(vectorList);
+ container.setRecordCount(recordCount);
+ myContainer.transferIn(container); // transfer the vectors
+ myContainer.buildSchema(svMode);
+ myContainer.setRecordCount(recordCount);
+ /*
+ // for debugging -- show values from the first row
+ Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+ Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+ Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+ if (tmp0 != null && tmp1 != null && tmp2 != null) {
+ NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+ NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+ NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+
+ try {
+ logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0));
+ } catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); }
+ }
+ else { logger.info("HASH AGG: got nulls !!!"); }
+ */
+ va = myContainer;
+ }
+
public void writeToStreamAndRetain(OutputStream output) throws IOException {
retain = true;
writeToStream(output);
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index a547e26..6f42250 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.base;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -102,17 +104,31 @@ public abstract class AbstractBase implements PhysicalOperator{
this.cost = cost;
}
- // Not available. Presumably because Drill does not currently use
- // this value, though it does appear in some test physical plans.
-// public void setMaxAllocation(long alloc) {
-// maxAllocation = alloc;
-// }
-
@Override
public long getMaxAllocation() {
return maxAllocation;
}
+ /**
+ * Any operator that supports spilling should override this method
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
+ public void setMaxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
+ }
+
+ /**
+ * Any operator that supports spilling should override this method (and return true)
+ * @return false
+ */
+ @Override @JsonIgnore
+ public boolean isBufferedOperator() { return false; }
+
+ // @Override
+ // public void setBufferedOperator(boolean bo) {}
+
@Override
public String getUserName() {
return userName;
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index b1954ca..980f32c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
*/
public long getMaxAllocation();
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ public void setMaxAllocation(long maxAllocation);
+
+ /**
+ *
+ * @return True iff this operator manages its memory (including disk spilling)
+ */
+ @JsonIgnore
+ public boolean isBufferedOperator();
+
+ // public void setBufferedOperator(boolean bo);
+
@JsonProperty("@id")
public int getOperatorId();
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 17848d0..cb9679d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -49,12 +49,19 @@ public class ExternalSort extends Sort {
return CoreOperatorType.EXTERNAL_SORT_VALUE;
}
- // Set here, rather than the base class, because this is the only
- // operator, at present, that makes use of the maximum allocation.
- // Remove this, in favor of the base class version, when Drill
- // sets the memory allocation for all operators.
-
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
public void setMaxAllocation(long maxAllocation) {
this.maxAllocation = maxAllocation;
}
+
+ /**
+ * The External Sort operator supports spilling
+ * @return true
+ */
+ @Override
+ public boolean isBufferedOperator() { return true; }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
index 4dafbe8..0614dc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java
@@ -21,6 +21,7 @@ import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -34,6 +35,7 @@ public class HashAggregate extends AbstractSingle {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class);
+ private final AggPrelBase.OperatorPhase aggPhase;
private final List<NamedExpression> groupByExprs;
private final List<NamedExpression> aggrExprs;
@@ -41,15 +43,19 @@ public class HashAggregate extends AbstractSingle {
@JsonCreator
public HashAggregate(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase,
@JsonProperty("keys") List<NamedExpression> groupByExprs,
@JsonProperty("exprs") List<NamedExpression> aggrExprs,
@JsonProperty("cardinality") float cardinality) {
super(child);
+ this.aggPhase = aggPhase;
this.groupByExprs = groupByExprs;
this.aggrExprs = aggrExprs;
this.cardinality = cardinality;
}
+ public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; }
+
public List<NamedExpression> getGroupByExprs() {
return groupByExprs;
}
@@ -69,7 +75,9 @@ public class HashAggregate extends AbstractSingle {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new HashAggregate(child, groupByExprs, aggrExprs, cardinality);
+ HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality);
+ newHAG.setMaxAllocation(getMaxAllocation());
+ return newHAG;
}
@Override
@@ -77,5 +85,18 @@ public class HashAggregate extends AbstractSingle {
return CoreOperatorType.HASH_AGGREGATE_VALUE;
}
-
+ /**
+ *
+ * @param maxAllocation The max memory allocation to be set
+ */
+ @Override
+ public void setMaxAllocation(long maxAllocation) {
+ this.maxAllocation = maxAllocation;
+ }
+ /**
+ * The Hash Aggregate operator supports spilling
+ * @return true
+ */
+ @Override
+ public boolean isBufferedOperator() { return true; }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index dc913b1..97e0599 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -55,7 +56,6 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
-import com.google.common.collect.Lists;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JVar;
@@ -63,12 +63,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class);
private HashAggregator aggregator;
- private final RecordBatch incoming;
+ private RecordBatch incoming;
private LogicalExpression[] aggrExprs;
private TypedFieldId[] groupByOutFieldIds;
private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch
private final List<Comparator> comparators;
private BatchSchema incomingSchema;
+ private boolean wasKilled;
private final GeneratorMapping UPDATE_AGGR_INSIDE =
GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -87,6 +88,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
super(popConfig, context);
this.incoming = incoming;
+ wasKilled = false;
final int numGrpByExprs = popConfig.getGroupByExprs().size();
comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs);
@@ -136,15 +138,36 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return IterOutcome.NONE;
}
- if (aggregator.buildComplete() && !aggregator.allFlushed()) {
- // aggregation is complete and not all records have been output yet
- return aggregator.outputCurrentBatch();
+ // if aggregation is complete and not all records have been output yet
+ if (aggregator.buildComplete() ||
+ // or: 1st phase need to return (not fully grouped) partial output due to memory pressure
+ aggregator.earlyOutput()) {
+ // then output the next batch downstream
+ HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch();
+ // if Batch returned, or end of data - then return the appropriate iter outcome
+ if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; }
+ if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; }
+ // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming
+ incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed
+ }
+
+ if (wasKilled) { // if kill() was called before, then finish up
+ aggregator.cleanup();
+ incoming.kill(false);
+ return IterOutcome.NONE;
}
- logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount());
+ // Read and aggregate records
+ // ( may need to run again if the spilled partition that was read
+ // generated new partitions that were all spilled )
+ AggOutcome out;
+ do {
+ //
+ // Read incoming batches and process their records
+ //
+ out = aggregator.doWork();
+ } while (out == AggOutcome.CALL_WORK_AGAIN);
- AggOutcome out = aggregator.doWork();
- logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
switch (out) {
case CLEANUP_AND_RETURN:
container.zeroVectors();
@@ -153,6 +176,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
// fall through
case RETURN_OUTCOME:
return aggregator.getOutcome();
+
case UPDATE_AGGREGATOR:
context.fail(UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged(
@@ -175,7 +199,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
* @return true if the aggregator was setup successfully. false if there was a failure.
*/
private boolean createAggregator() {
- logger.debug("Creating new aggregator.");
try {
stats.startSetup();
this.aggregator = createAggregatorInternal();
@@ -198,7 +221,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
top.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
-// top.saveCodeForDebugging(true);
+ // top.saveCodeForDebugging(true);
container.clear();
@@ -266,7 +289,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
agg.setup(popConfig, htConfig, context, this.stats,
- oContext.getAllocator(), incoming, this,
+ oContext, incoming, this,
aggrExprs,
cgInner.getWorkspaceTypes(),
groupByOutFieldIds,
@@ -314,6 +337,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
@Override
protected void killIncoming(boolean sendUpstream) {
+ wasKilled = true;
incoming.kill(sendUpstream);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/c16e5f80/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 1615200..38f0222 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -18,82 +18,155 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import javax.inject.Named;
+import com.google.common.base.Stopwatch;
+
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+
+import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
+
+import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
+
+import org.apache.drill.exec.physical.impl.spill.SpillSet;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
+
+import org.apache.drill.exec.proto.UserBitShared;
+
import org.apache.drill.exec.record.MaterializedField;
+
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatch.IterOutcome;
-import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.BatchSchema;
+
import org.apache.drill.exec.record.VectorContainer;
+
+import org.apache.drill.exec.record.TypedFieldId;
+
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.WritableBatch;
+
import org.apache.drill.exec.vector.AllocationHelper;
+
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
+
import org.apache.drill.exec.vector.VariableWidthVector;
+import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;
+
public abstract class HashAggTemplate implements HashAggregator {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+ protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
-// private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024;
-// private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000;
- private static final int VARIABLE_WIDTH_VALUE_SIZE = 50;
+ private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50;
+ private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
private static final boolean EXTRA_DEBUG_1 = false;
private static final boolean EXTRA_DEBUG_2 = false;
-// private static final String TOO_BIG_ERROR =
-// "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
-// private boolean newSchema = false;
+ private static final boolean EXTRA_DEBUG_SPILL = false;
+
+ // Fields needed for partitioning (the groups into partitions)
+ private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup())
+ private int partitionMask; // numPartitions - 1
+ private int bitsInMask; // number of bits in the MASK
+ private int nextPartitionToReturn = 0; // which partition to return the next batch from
+ // The following members are used for logging, metrics, etc.
+ private int rowsInPartition = 0; // counts #rows in each partition
+ private int rowsNotSpilled = 0;
+ private int rowsSpilled = 0;
+ private int rowsSpilledReturned = 0;
+ private int rowsReturnedEarly = 0;
+
+ private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
+ private boolean is2ndPhase = false;
+ private boolean canSpill = true; // make it false in case can not spill
+ private ChainedHashTable baseHashTable;
+ private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
+ private int earlyPartition = 0; // which partition to return early
+
+ private long memoryLimit; // max memory to be used by this oerator
+ private long estMaxBatchSize = 0; // used for adjusting #partitions
+ private long estRowWidth = 0;
+ private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
+ private long minBatchesPerPartition; // for tuning - num partitions and spill decision
+ private long plannedBatches = 0; // account for planned, but not yet allocated batches
+
private int underlyingIndex = 0;
private int currentIndex = 0;
private IterOutcome outcome;
-// private int outputCount = 0;
private int numGroupedRecords = 0;
- private int outBatchIndex = 0;
+ private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
+
private int lastBatchOutputCount = 0;
private RecordBatch incoming;
-// private BatchSchema schema;
+ private BatchSchema schema;
private HashAggBatch outgoing;
private VectorContainer outContainer;
-// private FragmentContext context;
+
+ private FragmentContext context;
+ private OperatorContext oContext;
private BufferAllocator allocator;
-// private HashAggregate hashAggrConfig;
- private HashTable htable;
- private ArrayList<BatchHolder> batchHolders;
+ private HashTable htables[];
+ private ArrayList<BatchHolder> batchHolders[];
+ private int outBatchIndex[];
+
+ // For handling spilling
+ private SpillSet spillSet;
+ SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
+ private OutputStream outputStream[]; // an output stream for each spilled partition
+ private int spilledBatchesCount[]; // count number of batches spilled, in each partition
+ private String spillFiles[];
+ private int cycleNum = 0; // primary, secondary, tertiary, etc.
+ private int originalPartition = -1; // the partition a secondary reads from
+
+ private static class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
+
+ private ArrayList<SpilledPartition> spilledPartitionsList;
+ private int operatorId; // for the spill file name
+
private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put()
private IndexPointer outStartIdxHolder;
private IndexPointer outNumRecordsHolder;
private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields
-
- ErrorCollector collector = new ErrorCollectorImpl();
+ private TypedFieldId[] groupByOutFieldIds;
private MaterializedField[] materializedValueFields;
private boolean allFlushed = false;
private boolean buildComplete = false;
+ private boolean handlingSpills = false; // True once starting to process spill files
private OperatorStats stats = null;
private HashTableStats htStats = new HashTableStats();
@@ -103,7 +176,15 @@ public abstract class HashAggTemplate implements HashAggregator {
NUM_BUCKETS,
NUM_ENTRIES,
NUM_RESIZING,
- RESIZING_TIME;
+ RESIZING_TIME,
+ NUM_PARTITIONS,
+ SPILLED_PARTITIONS, // number of partitions spilled to disk
+ SPILL_MB, // Number of MB of data spilled to disk. This amount is first written,
+ // then later re-read. So, disk I/O is twice this amount.
+ // For first phase aggr -- this is an estimate of the amount of data
+ // returned early (analogous to a spill in the 2nd phase).
+ SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY
+ ;
// duplicate for hash ag
@@ -121,7 +202,6 @@ public abstract class HashAggTemplate implements HashAggregator {
private int batchOutputCount = 0;
private int capacity = Integer.MAX_VALUE;
- private boolean allocatedNextBatch = false;
@SuppressWarnings("resource")
public BatchHolder() {
@@ -145,8 +225,8 @@ public abstract class HashAggTemplate implements HashAggregator {
if (vector instanceof FixedWidthVector) {
((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE);
} else if (vector instanceof VariableWidthVector) {
- ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE,
- HashTable.BATCH_SIZE);
+ // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap !
+ ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE);
} else if (vector instanceof ObjectVector) {
((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE);
} else {
@@ -166,20 +246,23 @@ public abstract class HashAggTemplate implements HashAggregator {
}
private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) {
- updateAggrValuesInternal(incomingRowIdx, idxWithinBatch);
+ try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); }
maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch);
return true;
}
private void setup() {
- setupInterior(incoming, outgoing, aggrValuesContainer);
+ try { setupInterior(incoming, outgoing, aggrValuesContainer); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
}
private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) {
outStartIdxHolder.value = batchOutputCount;
outNumRecordsHolder.value = 0;
for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) {
- outputRecordValues(i, batchOutputCount);
+ try { outputRecordValues(i, batchOutputCount); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
if (EXTRA_DEBUG_2) {
logger.debug("Outputting values to output index: {}", batchOutputCount);
}
@@ -204,24 +287,23 @@ public abstract class HashAggTemplate implements HashAggregator {
@RuntimeOverridden
public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing,
- @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) {
+ @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException {
}
@RuntimeOverridden
- public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) {
+ public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{
}
@RuntimeOverridden
- public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) {
+ public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{
}
}
-
@Override
public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
- OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
- VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException {
+ OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+ LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
+ VectorContainer outContainer) throws SchemaChangeException, IOException {
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -230,15 +312,34 @@ public abstract class HashAggTemplate implements HashAggregator {
throw new IllegalArgumentException("Wrong number of workspace variables.");
}
-// this.context = context;
+ this.context = context;
this.stats = stats;
- this.allocator = allocator;
+ this.allocator = oContext.getAllocator();
+ this.oContext = oContext;
this.incoming = incoming;
-// this.schema = incoming.getSchema();
this.outgoing = outgoing;
this.outContainer = outContainer;
+ this.operatorId = hashAggrConfig.getOperatorId();
+
+ is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
+ isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
+ canSpill = isTwoPhase; // single phase can not spill
+
+ // Typically for testing - force a spill after a partition has more than so many batches
+ minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION);
+
+ // Set the memory limit
+ memoryLimit = allocator.getLimit();
+ // Optional configured memory limit, typically used only for testing.
+ long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY);
+ if (configLimit > 0) {
+ logger.warn("Memory limit was changed to {}",configLimit);
+ memoryLimit = Math.min(memoryLimit, configLimit);
+ allocator.setLimit(memoryLimit); // enforce at the allocator
+ }
-// this.hashAggrConfig = hashAggrConfig;
+ // All the settings that require the number of partitions were moved into delayedSetup()
+ // which would be called later, after the actuall data first arrives
// currently, hash aggregation is only applicable if there are group-by expressions.
// For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no
@@ -266,112 +367,278 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
- ChainedHashTable ht =
+ spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE);
+ baseHashTable =
new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
- this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
-
+ this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
numGroupByOutFields = groupByOutFieldIds.length;
- batchHolders = new ArrayList<BatchHolder>();
- // First BatchHolder is created when the first put request is received.
doSetup(incoming);
}
+ /**
+ * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming
+ * This data is used to compute the number of partitions.
+ */
+ private void delayedSetup() {
+
+ // Set the number of partitions from the configuration (raise to a power of two, if needed)
+ numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
+ if ( numPartitions == 1 ) {
+ canSpill = false;
+ logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1");
+ }
+ numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
+
+ if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
+ else {
+ // Estimate the max batch size; should use actual data (e.g. lengths of varchars)
+ updateEstMaxBatchSize(incoming);
+ }
+ long memAvail = memoryLimit - allocator.getAllocatedMemory();
+ if ( !canSpill ) { // single phase, or spill disabled by configuation
+ numPartitions = 1; // single phase should use only a single partition (to save memory)
+ } else { // two phase
+ // Adjust down the number of partitions if needed - when the memory available can not hold as
+ // many batches (configurable option), plus overhead (e.g. hash table, links, hash values))
+ while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) {
+ numPartitions /= 2;
+ if ( numPartitions < 2) {
+ if ( is2ndPhase ) {
+ canSpill = false; // 2nd phase needs at least 2 to make progress
+ logger.warn("Spilling was disabled - not enough memory available for internal partitioning");
+ }
+ break;
+ }
+ }
+ }
+ logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+ numPartitions, canSpill ? "Can" : "Cannot");
+
+ // The following initial safety check should be revisited once we can lower the number of rows in a batch
+ // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table)
+ if ( numPartitions == 1 ) {
+ // if too little memory - behave like the old code -- no memory limit for hash aggregate
+ allocator.setLimit(AbstractBase.MAX_ALLOCATION); // 10_000_000_000L
+ }
+ // Based on the number of partitions: Set the mask and bit count
+ partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
+ bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
+
+ // Create arrays (one entry per partition)
+ htables = new HashTable[numPartitions] ;
+ batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions] ;
+ outBatchIndex = new int[numPartitions] ;
+ outputStream = new OutputStream[numPartitions];
+ spilledBatchesCount = new int[numPartitions];
+ spillFiles = new String[numPartitions];
+ spilledPartitionsList = new ArrayList<SpilledPartition>();
+
+ plannedBatches = numPartitions; // each partition should allocate its first batch
+
+ // initialize every (per partition) entry in the arrays
+ for (int i = 0; i < numPartitions; i++ ) {
+ try {
+ this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
+ this.htables[i].setMaxVarcharSize(maxColumnWidth);
+ } catch (ClassTransformationException e) {
+ throw UserException.unsupportedError(e)
+ .message("Code generation error - likely an error in the code.")
+ .build(logger);
+ } catch (IOException e) {
+ throw UserException.resourceError(e)
+ .message("IO Error while creating a hash table.")
+ .build(logger);
+ } catch (SchemaChangeException sce) {
+ throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
+ }
+ this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+ }
+ }
+ /**
+ * get new incoming: (when reading spilled files like an "incoming")
+ * @return The (newly replaced) incoming
+ */
+ @Override
+ public RecordBatch getNewIncoming() { return newIncoming; }
+
+ private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException {
+ baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming
+ this.incoming = newIncoming;
+ currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file
+ nextPartitionToReturn = 0;
+ for (int i = 0; i < numPartitions; i++ ) {
+ htables[i].reinit(newIncoming);
+ if ( batchHolders[i] != null) {
+ for (BatchHolder bh : batchHolders[i]) {
+ bh.clear();
+ }
+ batchHolders[i].clear();
+ batchHolders[i] = new ArrayList<BatchHolder>();
+ }
+ outBatchIndex[i] = 0;
+ outputStream[i] = null;
+ spilledBatchesCount[i] = 0;
+ spillFiles[i] = null;
+ }
+ }
+
+ /**
+ * Update the estimated max batch size to be used in the Hash Aggr Op.
+ * using the record batch size to get the row width.
+ * @param incoming
+ */
+ private void updateEstMaxBatchSize(RecordBatch incoming) {
+ if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change
+ RecordBatchSizer sizer = new RecordBatchSizer(incoming);
+ logger.trace("Incoming sizer: {}",sizer);
+ // An empty batch only has the schema, can not tell actual length of varchars
+ // else use the actual varchars length, each capped at 50 (to match the space allocation)
+ estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
+ estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+
+ // Get approx max (varchar) column width to get better memory allocation
+ maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+ maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
+
+ logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}",
+ isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+
+ if ( estMaxBatchSize > memoryLimit ) {
+ logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit);
+ }
+ }
+
+ /**
+ * Read and process (i.e., insert into the hash table and aggregate) records from the current batch.
+ * Once complete, get the incoming NEXT batch and process it as well, etc.
+ * For 1st phase, may return when an early output needs to be performed.
+ *
+ * @return Agg outcome status
+ */
@Override
public AggOutcome doWork() {
- try {
- // Note: Keeping the outer and inner try blocks here to maintain some similarity with
- // StreamingAggregate which does somethings conditionally in the outer try block.
- // In the future HashAggregate may also need to perform some actions conditionally
- // in the outer try block.
-
- outside:
- while (true) {
- // loop through existing records, aggregating the values as necessary.
- if (EXTRA_DEBUG_1) {
- logger.debug("Starting outer loop of doWork()...");
+
+ while (true) {
+
+ // This would be called only once - first time actual data arrives on incoming
+ if ( schema == null && incoming.getRecordCount() > 0 ) {
+ this.schema = incoming.getSchema();
+ currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch
+ // Calculate the number of partitions based on actual incoming data
+ delayedSetup();
+ }
+
+ //
+ // loop through existing records in this batch, aggregating the values as necessary.
+ //
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Starting outer loop of doWork()...");
+ }
+ for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+ if (EXTRA_DEBUG_2) {
+ logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
}
- for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
- if (EXTRA_DEBUG_2) {
- logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
- }
- checkGroupAndAggrValues(currentIndex);
+ checkGroupAndAggrValues(currentIndex);
+ // If adding a group discovered a memory pressure during 1st phase, then start
+ // outputing some partition downstream in order to free memory.
+ if ( earlyOutput ) {
+ outputCurrentBatch();
+ incIndex(); // next time continue with the next incoming row
+ return AggOutcome.RETURN_OUTCOME;
}
+ }
+
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Processed {} records", underlyingIndex);
+ }
- if (EXTRA_DEBUG_1) {
- logger.debug("Processed {} records", underlyingIndex);
+ // Cleanup the previous batch since we are done processing it.
+ for (VectorWrapper<?> v : incoming) {
+ v.getValueVector().clear();
+ }
+ //
+ // Get the NEXT input batch, initially from the upstream, later (if there was a spill)
+ // from one of the spill files (The spill case is handled differently here to avoid
+ // collecting stats on the spilled records)
+ //
+ if ( handlingSpills ) {
+ outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
+ } else {
+ long beforeAlloc = allocator.getAllocatedMemory();
+
+ // Get the next RecordBatch from the incoming (i.e. upstream operator)
+ outcome = outgoing.next(0, incoming);
+
+ // If incoming batch is bigger than our estimate - adjust the estimate to match
+ long afterAlloc = allocator.getAllocatedMemory();
+ long incomingBatchSize = afterAlloc - beforeAlloc;
+ if ( estMaxBatchSize < incomingBatchSize) {
+ logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize);
+ estMaxBatchSize = incomingBatchSize;
}
+ }
- try {
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received IterOutcome of {}", outcome);
+ }
- while (true) {
- // Cleanup the previous batch since we are done processing it.
- for (VectorWrapper<?> v : incoming) {
- v.getValueVector().clear();
- }
- IterOutcome out = outgoing.next(0, incoming);
- if (EXTRA_DEBUG_1) {
- logger.debug("Received IterOutcome of {}", out);
- }
- switch (out) {
- case OUT_OF_MEMORY:
- case NOT_YET:
- this.outcome = out;
- return AggOutcome.RETURN_OUTCOME;
-
- case OK_NEW_SCHEMA:
- if (EXTRA_DEBUG_1) {
- logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
- }
-// newSchema = true;
- this.cleanup();
- // TODO: new schema case needs to be handled appropriately
- return AggOutcome.UPDATE_AGGREGATOR;
-
- case OK:
- resetIndex();
- if (incoming.getRecordCount() == 0) {
- continue;
- } else {
- checkGroupAndAggrValues(currentIndex);
- incIndex();
-
- if (EXTRA_DEBUG_1) {
- logger.debug("Continuing outside loop");
- }
- continue outside;
- }
-
- case NONE:
- // outcome = out;
-
- buildComplete = true;
-
- updateStats(htable);
-
- // output the first batch; remaining batches will be output
- // in response to each next() call by a downstream operator
-
- outputCurrentBatch();
-
- // return setOkAndReturn();
- return AggOutcome.RETURN_OUTCOME;
-
- case STOP:
- default:
- outcome = out;
- return AggOutcome.CLEANUP_AND_RETURN;
- }
+ // Handle various results from getting the next batch
+ switch (outcome) {
+ case OUT_OF_MEMORY:
+ case NOT_YET:
+ return AggOutcome.RETURN_OUTCOME;
+
+ case OK_NEW_SCHEMA:
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount());
}
+ this.cleanup();
+ // TODO: new schema case needs to be handled appropriately
+ return AggOutcome.UPDATE_AGGREGATOR;
- } finally {
- // placeholder...
- }
+ case OK:
+ currentBatchRecordCount = incoming.getRecordCount(); // size of next batch
+
+ resetIndex(); // initialize index (a new batch needs to be processed)
+
+ if (EXTRA_DEBUG_1) {
+ logger.debug("Continue to start processing the next batch");
+ }
+ break;
+
+ case NONE:
+ resetIndex(); // initialize index (in case spill files need to be processed)
+
+ buildComplete = true;
+
+ updateStats(htables);
+
+ // output the first batch; remaining batches will be output
+ // in response to each next() call by a downstream operator
+ AggIterOutcome aggOutcome = outputCurrentBatch();
+
+ if ( aggOutcome == AggIterOutcome.AGG_RESTART ) {
+ // Output of first batch returned a RESTART (all new partitions were spilled)
+ return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition
+ }
+
+ if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; }
+
+ return AggOutcome.RETURN_OUTCOME;
+
+ case STOP:
+ default:
+ return AggOutcome.CLEANUP_AND_RETURN;
}
- } finally {
}
}
+ /**
+ * Allocate space for the returned aggregate columns
+ * (Note DRILL-5588: Maybe can eliminate this allocation (and copy))
+ * @param records
+ */
private void allocateOutgoing(int records) {
// Skip the keys and only allocate for outputting the workspace values
// (keys will be output through splitAndTransfer)
@@ -382,14 +649,8 @@ public abstract class HashAggTemplate implements HashAggregator {
while (outgoingIter.hasNext()) {
@SuppressWarnings("resource")
ValueVector vv = outgoingIter.next().getValueVector();
-// MajorType type = vv.getField().getType();
- /*
- * In build schema we use the allocation model that specifies exact record count
- * so we need to stick with that allocation model until DRILL-2211 is resolved. Using
- * 50 as the average bytes per value as is used in HashTable.
- */
- AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE, 0);
+ AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
}
}
@@ -400,45 +661,82 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public int getOutputCount() {
- // return outputCount;
return lastBatchOutputCount;
}
@Override
public void cleanup() {
- if (htable != null) {
- htable.clear();
- htable = null;
+ if ( schema == null ) { return; } // not set up; nothing to clean
+ if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+ (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+ }
+ // clean (and deallocate) each partition
+ for ( int i = 0; i < numPartitions; i++) {
+ if (htables[i] != null) {
+ htables[i].clear();
+ htables[i] = null;
+ }
+ if ( batchHolders[i] != null) {
+ for (BatchHolder bh : batchHolders[i]) {
+ bh.clear();
+ }
+ batchHolders[i].clear();
+ batchHolders[i] = null;
+ }
+
+ // delete any (still active) output spill file
+ if ( outputStream[i] != null && spillFiles[i] != null) {
+ try {
+ outputStream[i].close();
+ outputStream[i] = null;
+ spillSet.delete(spillFiles[i]);
+ spillFiles[i] = null;
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
+ }
+ }
}
+ // delete any spill file left in unread spilled partitions
+ while ( ! spilledPartitionsList.isEmpty() ) {
+ SpilledPartition sp = spilledPartitionsList.remove(0);
+ try {
+ spillSet.delete(sp.spillFile);
+ } catch(IOException e) {
+ logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile);
+ }
+ }
+ // Delete the currently handled (if any) spilled file
+ if ( newIncoming != null ) { newIncoming.close(); }
+ spillSet.close(); // delete the spill directory(ies)
htIdxHolder = null;
materializedValueFields = null;
outStartIdxHolder = null;
outNumRecordsHolder = null;
+ }
- if (batchHolders != null) {
- for (BatchHolder bh : batchHolders) {
+ // First free the memory used by the given (spilled) partition (i.e., hash table plus batches)
+ // then reallocate them in pristine state to allow the partition to continue receiving rows
+ private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ {
+ assert htables[part] != null;
+ htables[part].reset();
+ if ( batchHolders[part] != null) {
+ for (BatchHolder bh : batchHolders[part]) {
bh.clear();
}
- batchHolders.clear();
- batchHolders = null;
+ batchHolders[part].clear();
}
+ batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
}
-// private final AggOutcome setOkAndReturn() {
-// this.outcome = IterOutcome.OK;
-// for (VectorWrapper<?> v : outgoing) {
-// v.getValueVector().getMutator().setValueCount(outputCount);
-// }
-// return AggOutcome.RETURN_OUTCOME;
-// }
-
private final void incIndex() {
underlyingIndex++;
- if (underlyingIndex >= incoming.getRecordCount()) {
+ if (underlyingIndex >= currentBatchRecordCount) {
currentIndex = Integer.MAX_VALUE;
return;
}
- currentIndex = getVectorIndex(underlyingIndex);
+ try { currentIndex = getVectorIndex(underlyingIndex); }
+ catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);}
}
private final void resetIndex() {
@@ -446,71 +744,337 @@ public abstract class HashAggTemplate implements HashAggregator {
incIndex();
}
- private void addBatchHolder() {
+ private boolean isSpilled(int part) {
+ return outputStream[part] != null;
+ }
+ /**
+ * Which partition to choose for flushing out (i.e. spill or return) ?
+ * - The current partition (to which a new bach holder is added) has a priority,
+ * because its last batch holder is full.
+ * - Also the largest prior spilled partition has some priority, as it is already spilled;
+ * but spilling too few rows (e.g. a single batch) gets us nothing.
+ * - So the largest non-spilled partition has some priority, to get more memory freed.
+ * Need to weigh the above three options.
+ *
+ * @param currPart - The partition that hit the memory limit (gets a priority)
+ * @return The partition (number) chosen to be spilled
+ */
+ private int chooseAPartitionToFlush(int currPart) {
+ if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
+ int currPartSize = batchHolders[currPart].size();
+ if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
+ // first find the largest spilled partition
+ int maxSizeSpilled = -1;
+ int indexMaxSpilled = -1;
+ for (int isp = 0; isp < numPartitions; isp++ ) {
+ if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
+ maxSizeSpilled = batchHolders[isp].size();
+ indexMaxSpilled = isp;
+ }
+ }
+ // Give the current (if already spilled) some priority
+ if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+ maxSizeSpilled = currPartSize ;
+ indexMaxSpilled = currPart;
+ }
+ // now find the largest non-spilled partition
+ int maxSize = -1;
+ int indexMax = -1;
+ // Use the largest spilled (if found) as a base line, with a factor of 4
+ if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
+ indexMax = indexMaxSpilled;
+ maxSize = 4 * maxSizeSpilled ;
+ }
+ for ( int insp = 0; insp < numPartitions; insp++) {
+ if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
+ indexMax = insp;
+ maxSize = batchHolders[insp].size();
+ }
+ }
+ // again - priority to the current partition
+ if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+ return currPart;
+ }
+ if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
+ return -1; // try skipping this spill
+ }
+ return indexMax;
+ }
+
+ /**
+ * Iterate through the batches of the given partition, writing them to a file
+ *
+ * @param part The partition (number) to spill
+ */
+ private void spillAPartition(int part) {
+
+ ArrayList<BatchHolder> currPartition = batchHolders[part];
+ rowsInPartition = 0;
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size());
+ }
+
+ if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill
+
+ // If this is the first spill for this partition, create an output stream
+ if ( ! isSpilled(part) ) {
+
+ spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
+
+ try {
+ outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+ } catch (IOException ioe) {
+ throw UserException.resourceError(ioe)
+ .message("Hash Aggregation failed to open spill file: " + spillFiles[part])
+ .build(logger);
+ }
+ }
+
+ for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) {
+
+ // get the number of records in the batch holder that are pending output
+ int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+ rowsInPartition += numPendingOutput; // for logging
+ rowsSpilled += numPendingOutput;
+
+ allocateOutgoing(numPendingOutput);
+
+ currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+ int numOutputRecords = outNumRecordsHolder.value;
+
+ this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+ // set the value count for outgoing batch value vectors
+ /* int i = 0; */
+ for (VectorWrapper<?> v : outgoing) {
+ v.getValueVector().getMutator().setValueCount(numOutputRecords);
+ /*
+ // print out the first row to be spilled ( varchar, varchar, bigint )
+ try {
+ if (i++ < 2) {
+ NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector());
+ logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+ } else {
+ NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector());
+ logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
+ }
+ } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); }
+ */
+ }
+
+ outContainer.setRecordCount(numPendingOutput);
+ WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
+ VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
+ Stopwatch watch = Stopwatch.createStarted();
+ try {
+ outputBatch.writeToStream(outputStream[part]);
+ } catch (IOException ioe) {
+ throw UserException.dataWriteError(ioe)
+ .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString())
+ .build(logger);
+ }
+ outContainer.zeroVectors();
+ logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+ }
+
+ spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
+
+ logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part);
+ }
+
+ private void addBatchHolder(int part) {
+
BatchHolder bh = newBatchHolder();
- batchHolders.add(bh);
+ batchHolders[part].add(bh);
if (EXTRA_DEBUG_1) {
- logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size());
+ logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
}
bh.setup();
}
- // Overridden in the generated class when created as plain Java code.
-
+ // These methods are overridden in the generated class when created as plain Java code.
protected BatchHolder newBatchHolder() {
return new BatchHolder();
}
+ /**
+ * Output the next batch from partition "nextPartitionToReturn"
+ *
+ * @return iteration outcome (e.g., OK, NONE ...)
+ */
@Override
- public IterOutcome outputCurrentBatch() {
- if (outBatchIndex >= batchHolders.size()) {
- this.outcome = IterOutcome.NONE;
- return outcome;
+ public AggIterOutcome outputCurrentBatch() {
+
+ // when incoming was an empty batch, just finish up
+ if ( schema == null ) {
+ logger.trace("Incoming was empty; output is an empty batch.");
+ this.outcome = IterOutcome.NONE; // no records were read
+ allFlushed = true;
+ return AggIterOutcome.AGG_NONE;
}
- // get the number of records in the batch holder that are pending output
- int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput();
+ // Initialization (covers the case of early output)
+ ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
+ int currOutBatchIndex = outBatchIndex[earlyPartition];
+ int partitionToReturn = earlyPartition;
+
+ if ( ! earlyOutput ) {
+ // Update the next partition to return (if needed)
+ // skip fully returned (or spilled) partitions
+ while (nextPartitionToReturn < numPartitions) {
+ //
+ // If this partition was spilled - spill the rest of it and skip it
+ //
+ if ( isSpilled(nextPartitionToReturn) ) {
+ spillAPartition(nextPartitionToReturn); // spill the rest
+ SpilledPartition sp = new SpilledPartition();
+ sp.spillFile = spillFiles[nextPartitionToReturn];
+ sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn];
+ sp.cycleNum = cycleNum; // remember the current cycle
+ sp.origPartn = nextPartitionToReturn; // for debugging / filename
+ sp.prevOrigPartn = originalPartition; // for debugging / filename
+ spilledPartitionsList.add(sp);
+
+ reinitPartition(nextPartitionToReturn); // free the memory
+ long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
+ spillSet.tallyWriteBytes(posn); // for the IO stats
+ try {
+ outputStream[nextPartitionToReturn].close();
+ } catch (IOException ioe) {
+ throw UserException.resourceError(ioe)
+ .message("IO Error while closing output stream")
+ .build(logger);
+ }
+ outputStream[nextPartitionToReturn] = null;
+ }
+ else {
+ currPartition = batchHolders[nextPartitionToReturn];
+ currOutBatchIndex = outBatchIndex[nextPartitionToReturn];
+ // If curr batch (partition X index) is not empty - proceed to return it
+ if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) {
+ break;
+ }
+ }
+ nextPartitionToReturn++; // else check next partition
+ }
+
+ // if passed the last partition - either done or need to restart and read spilled partitions
+ if (nextPartitionToReturn >= numPartitions) {
+ // The following "if" is probably never used; due to a similar check at the end of this method
+ if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions
+ allFlushed = true;
+ this.outcome = IterOutcome.NONE;
+ if ( is2ndPhase ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
+ (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
+ }
+ return AggIterOutcome.AGG_NONE; // then return NONE
+ }
+ // Else - there are still spilled partitions to process - pick one and handle just like a new incoming
+ buildComplete = false; // go back and call doWork() again
+ handlingSpills = true; // beginning to work on the spill files
+ // pick a spilled partition; set a new incoming ...
+ SpilledPartition sp = spilledPartitionsList.remove(0);
+ // Create a new "incoming" out of the spilled partition spill file
+ newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet);
+ originalPartition = sp.origPartn; // used for the filename
+ logger.trace("Reading back spilled original partition {} as an incoming",originalPartition);
+ // Initialize .... new incoming, new set of partitions
+ try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e); }
+ // update the cycle num if needed
+ // The current cycle num should always be one larger than in the spilled partition
+ if ( cycleNum == sp.cycleNum ) {
+ cycleNum = 1 + sp.cycleNum;
+ stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats
+ // report first spill or memory stressful situations
+ if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); }
+ if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); }
+ if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); }
+ if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); }
+ if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); }
+ }
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.",
+ sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size());
+ }
+ return AggIterOutcome.AGG_RESTART;
+ }
+
+ partitionToReturn = nextPartitionToReturn ;
- if (numPendingOutput == 0) {
- this.outcome = IterOutcome.NONE;
- return outcome;
}
+ // get the number of records in the batch holder that are pending output
+ int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput();
+
+ // The following accounting is for logging, metrics, etc.
+ rowsInPartition += numPendingOutput ;
+ if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; }
+ else { rowsSpilledReturned += numPendingOutput; }
+ if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; }
+
allocateOutgoing(numPendingOutput);
- batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
+ currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder);
int numOutputRecords = outNumRecordsHolder.value;
if (EXTRA_DEBUG_1) {
logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value);
}
- this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
+
+ this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value);
// set the value count for outgoing batch value vectors
for (VectorWrapper<?> v : outgoing) {
v.getValueVector().getMutator().setValueCount(numOutputRecords);
}
-// outputCount += numOutputRecords;
-
this.outcome = IterOutcome.OK;
- logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords);
+ if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+ logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
+ rowsNotSpilled+rowsSpilledReturned,
+ rowsSpilled);
+ }
lastBatchOutputCount = numOutputRecords;
- outBatchIndex++;
- if (outBatchIndex == batchHolders.size()) {
- allFlushed = true;
+ outBatchIndex[partitionToReturn]++;
+ // if just flushed the last batch in the partition
+ if (outBatchIndex[partitionToReturn] == currPartition.size()) {
+
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows",
+ earlyOutput ? "(Early)" : "",
+ partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition);
+ }
+ rowsInPartition = 0; // reset to count for the next partition
+
+ // deallocate memory used by this partition, and re-initialize
+ reinitPartition(partitionToReturn);
- logger.debug("HashAggregate: All batches flushed.");
+ if ( earlyOutput ) {
- // cleanup my internal state since there is nothing more to return
- this.cleanup();
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory());
+ }
+ outBatchIndex[earlyPartition] = 0; // reset, for next time
+ earlyOutput = false ; // done with early output
+ }
+ else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ?
+
+ allFlushed = true; // next next() call will return NONE
+
+ logger.trace("HashAggregate: All batches flushed.");
+
+ // cleanup my internal state since there is nothing more to return
+ this.cleanup();
+ }
}
- return this.outcome;
+ return AggIterOutcome.AGG_OK;
}
@Override
@@ -522,11 +1086,33 @@ public abstract class HashAggTemplate implements HashAggregator {
public boolean buildComplete() {
return buildComplete;
}
+ @Override
+ public boolean earlyOutput() { return earlyOutput; }
public int numGroupedRecords() {
return numGroupedRecords;
}
+ /**
+ * Generate a detailed error message in case of "Out Of Memory"
+ * @return err msg
+ */
+ private String getOOMErrorMsg() {
+ String errmsg;
+ if ( !isTwoPhase ) {
+ errmsg = "Single Phase Hash Aggregate operator can not spill." ;
+ } else if ( ! canSpill ) { // 2nd phase, with only 1 partition
+ errmsg = "Too little memory available to operator to facilitate spilling.";
+ } else { // a bug ?
+ errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
+ ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
+ if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
+ }
+ errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
+
+ return errmsg;
+ }
+
// Check if a group is present in the hash table; if not, insert it in the hash table.
// The htIdxHolder contains the index of the group in the hash table container; this same
// index is also used for the aggregation values maintained by the hash aggregate.
@@ -535,6 +1121,8 @@ public abstract class HashAggTemplate implements HashAggregator {
throw new IllegalArgumentException("Invalid incoming row index.");
}
+ assert ! earlyOutput;
+
/** for debugging
Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector();
BigIntVector vv0 = null;
@@ -546,44 +1134,189 @@ public abstract class HashAggTemplate implements HashAggregator {
holder.value = vv0.getAccessor().get(incomingRowIdx) ;
}
*/
+ /*
+ if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
+ // for debugging -- show the first row from a spilled batch
+ Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
+ Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
+ Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
+
+ if (tmp0 != null && tmp1 != null && tmp2 != null) {
+ NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
+ NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
+ NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);
+ logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx));
+ }
+ }
+ */
+ // The hash code is computed once, then its lower bits are used to determine the
+ // partition to use, and the higher bits determine the location in the hash table.
+ int hashCode;
+ try {
+ htables[0].updateBatches();
+ hashCode = htables[0].getHashCode(incomingRowIdx);
+ } catch (SchemaChangeException e) {
+ throw new UnsupportedOperationException("Unexpected schema change", e);
+ }
- htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */);
+ // right shift hash code for secondary (or tertiary...) spilling
+ for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; }
- int currentIdx = htIdxHolder.value;
+ int currentPartition = hashCode & partitionMask ;
+ hashCode >>>= bitsInMask;
+ HashTable.PutStatus putStatus = null;
+ long allocatedBefore = allocator.getAllocatedMemory();
- // get the batch index and index within the batch
- if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) {
- addBatchHolder();
+ // Insert the key columns into the hash table
+ try {
+ putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+ } catch (OutOfMemoryException exc) {
+ throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+ } catch (SchemaChangeException e) {
+ throw new UnsupportedOperationException("Unexpected schema change", e);
}
- BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK);
- int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
+ int currentIdx = htIdxHolder.value;
- // Check if we have almost filled up the workspace vectors and add a batch if necessary
- if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) {
- htable.addNewKeyBatch();
- addBatchHolder();
- bh.allocatedNextBatch = true;
+ long addedMem = allocator.getAllocatedMemory() - allocatedBefore;
+ if ( addedMem > 0 ) {
+ logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition);
}
+ // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch
+ // (for the aggregate columns) needs to be created
+ if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
+ try {
+ long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+
+ addBatchHolder(currentPartition);
+
+ if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
+ long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
+ logger.trace("MEMORY CHECK AGG: added {} total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem);
+ // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
+ if ( totalAddedMem > estMaxBatchSize ) {
+ logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
+ estMaxBatchSize = totalAddedMem;
+ }
+ } catch (OutOfMemoryException exc) {
+ throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill
+ }
+ }
+ BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK);
+ int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) {
numGroupedRecords++;
}
+
+ // ===================================================================================
+ // If the last batch just became full - that is the time to check the memory limits !!
+ // If exceeded, then need to spill (if 2nd phase) or output early (1st)
+ // (Skip this if cannot spill; in such case an OOM may be encountered later)
+ // ===================================================================================
+ if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) {
+
+ plannedBatches++; // planning to allocate one more batch
+
+ // calculate the (max) new memory needed now
+ long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize
+ for ( HashTable ht : htables ) {
+ hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize();
+ }
+
+ // Plan ahead for at least MIN batches, to account for size changing, and some overhead
+ long maxMemoryNeeded = minBatchesPerPartition * plannedBatches *
+ ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) +
+ hashTableDoublingSizeNeeded;
+
+ // log a detailed debug message explaining why a spill may be needed
+ logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
+ "Memory needed {}, Est batch size {}, mem limit {}",
+ allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
+ batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
+ //
+ // Spill if the allocated memory plus the memory needed exceeds the memory limit.
+ //
+ if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+
+ // Pick a "victim" partition to spill or return
+ int victimPartition = chooseAPartitionToFlush(currentPartition);
+
+ // In case no partition has more than one batch -- try and "push the limits"; maybe next
+ // time the spill could work.
+ if ( victimPartition < 0 ) { return; }
+
+ if ( is2ndPhase ) {
+ long before = allocator.getAllocatedMemory();
+
+ spillAPartition(victimPartition);
+ logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
+
+ // Re-initialize (free memory, then recreate) the partition just spilled/returned
+ reinitPartition(victimPartition);
+
+ // in some "edge" cases (e.g. testing), spilling one partition may not be enough
+ if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
+ int victimPartition2 = chooseAPartitionToFlush(victimPartition);
+ if ( victimPartition2 < 0 ) { return; }
+ long after = allocator.getAllocatedMemory();
+ spillAPartition(victimPartition2);
+ reinitPartition(victimPartition2);
+ logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
+ before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
+ logger.trace("Second Partition Spilled: {}",victimPartition2);
+ }
+ }
+ else {
+ // 1st phase need to return a partition early in order to free some memory
+ earlyOutput = true;
+ earlyPartition = victimPartition;
+
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("picked partition {} for early output", victimPartition);
+ }
+ }
+ }
+ }
}
- private void updateStats(HashTable htable) {
- htable.getStats(htStats);
+ /**
+ * Updates the stats at the time after all the input was read.
+ * Note: For spilled partitions, their hash-table stats from before the spill are lost.
+ * And the SPILLED_PARTITIONS only counts the spilled partitions in the primary, not SECONDARY etc.
+ * @param htables
+ */
+ private void updateStats(HashTable[] htables) {
+ if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
+ long numSpilled = 0;
+ HashTableStats newStats = new HashTableStats();
+ // sum the stats from all the partitions
+ for (int ind = 0; ind < numPartitions; ind++) {
+ htables[ind].getStats(newStats);
+ htStats.addStats(newStats);
+ if (isSpilled(ind)) {
+ numSpilled++;
+ }
+ }
this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
+ this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions);
+ if ( is2ndPhase ) {
+ this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
+ }
+ if ( rowsReturnedEarly > 0 ) {
+ stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
+ (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
+ }
}
// Code-generated methods (implemented in HashAggBatch)
- public abstract void doSetup(@Named("incoming") RecordBatch incoming);
+ public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException;
- public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+ public abstract int getVectorIndex(@Named("recordIndex") int recordIndex) throws SchemaChangeException;
- public abstract boolean resetValues();
+ public abstract boolean resetValues() throws SchemaChangeException;
}