You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/22 23:40:33 UTC
[1/2] drill git commit: DRILL-5694: Handle OOM in HashAggr by spill
and retry, reserve memory, spinner
Repository: drill
Updated Branches:
refs/heads/master 4c99f0cdd -> d77ab3183
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
index 8f0f770..7dbc9a3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
*/
public class TestHashAggrSpill {
- private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long spilledPartitions) throws Exception {
+ private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long fromSpilledPartitions, long toSpilledPartitions) throws Exception {
String plan = client.queryBuilder().sql(sql).explainJson();
QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
@@ -63,7 +63,7 @@ public class TestHashAggrSpill {
long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal());
assertEquals(spillCycle, opCycle);
long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal());
- assertEquals(spilledPartitions, op_spilled_partitions);
+ assertTrue( op_spilled_partitions >= fromSpilledPartitions && op_spilled_partitions <= toSpilledPartitions );
/* assertEquals(3, ops.size());
for ( int i = 0; i < ops.size(); i++ ) {
ProfileParser.OperatorProfile hag = ops.get(i);
@@ -77,134 +77,107 @@ public class TestHashAggrSpill {
}
/**
- * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling
- * ("normal spill" means spill-cycle = 1 )
+ * A template for Hash Aggr spilling tests
*
* @throws Exception
*/
- @Test
- public void testHashAggrSpill() throws Exception {
+ private void testSpill(long maxMem, long numPartitions, long minBatches, int maxParallel, boolean fallback ,boolean predict,
+ String sql, long expectedRows, int cycle, int fromPart, int toPart) throws Exception {
LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- ;
+ .toConsole()
+ //.logger("org.apache.drill.exec.physical.impl.aggregate", Level.INFO)
+ .logger("org.apache.drill", Level.WARN)
+ ;
FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
- .maxParallelization(2)
- .saveProfiles()
- //.keepLocalFiles()
- ;
+ .sessionOption(ExecConstants.HASHAGG_MAX_MEMORY_KEY,maxMem)
+ .sessionOption(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY,numPartitions)
+ .sessionOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_KEY,minBatches)
+ .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
+ .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
+ // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
+ .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, fallback)
+ .sessionOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_KEY,predict)
+
+ .maxParallelization(maxParallel)
+ .saveProfiles()
+ //.keepLocalFiles()
+ ;
+ String sqlStr = sql != null ? sql : // if null then use this default query
+ "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
+
try (LogFixture logs = logBuilder.build();
ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
- runAndDump(client, sql, 1_200_000, 1, 1);
+ runAndDump(client, sqlStr, expectedRows, cycle, fromPart,toPart);
}
}
-
/**
- * Test "secondary" spilling -- Some of the spilled partitions cause more spilling as they are read back
- * (Hence spill-cycle = 2 )
+ * Test "normal" spilling: Only 2 (or 3) partitions (out of 4) would require spilling
+ * ("normal spill" means spill-cycle = 1 )
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testSimpleHashAggrSpill() throws Exception {
+ testSpill(68_000_000, 16, 2, 2, false, true, null,
+ 1_200_000, 1,2, 3
+ );
+ }
+ /**
+ * Test with "needed memory" prediction turned off
+ * (i.e., do exercise code paths that catch OOMs from the Hash Table and recover)
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testNoPredictHashAggrSpill() throws Exception {
+ testSpill(58_000_000, 16, 2, 2, false,false /* no prediction */,
+ null,1_200_000, 1,1, 1
+ );
+ }
+ /**
+ * Test Secondary and Tertiary spill cycles - Happens when some of the spilled partitions cause more spilling as they are read back
*
* @throws Exception
*/
@Test
public void testHashAggrSecondaryTertiarySpill() throws Exception {
- LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- .logger("org.apache.drill.exec.cache", Level.INFO)
- ;
- FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false)
- // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true)
- .maxParallelization(1)
- .saveProfiles()
- //.keepLocalFiles()
- ;
- try (LogFixture logs = logBuilder.build();
- ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i";
- runAndDump(client, sql, 1_100_000, 3, 2);
- }
+ testSpill(58_000_000, 16, 3, 1, false,true,
+ "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i",
+ 1_100_000, 3,2, 2
+ );
}
-
/**
- * Test when memory limit is set to very low value even to hold one partition in 2 Phase Hash Agg scenario and
- * fallback mechanism to use unbounded memory is disabled then Query Fails in HashAgg with Resource Error.
+ * Test with the "fallback" option disabled: When not enough memory available to allow spilling, then fail (Resource error) !!
+ *
* @throws Exception
*/
@Test
public void testHashAggrFailWithFallbackDisabed() throws Exception {
- LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- ;
- FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- .maxParallelization(2)
- .saveProfiles()
- ;
- try (LogFixture logs = logBuilder.build();
- ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
- QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run();
- fail();
+ try {
+ testSpill(34_000_000, 4, 5, 2, false /* no fallback */, true, null,
+ 1_200_000, 0 /* no spill due to fallback to pre-1.11 */, 0, 0);
+ fail(); // in case the above test did not throw
} catch (Exception ex) {
assertTrue(ex instanceof UserRemoteException);
assertTrue(((UserRemoteException)ex).getErrorType() == UserBitShared.DrillPBError.ErrorType.RESOURCE);
+ // must get here for the test to succeed ...
}
}
-
/**
- * Test when memory limit is set to very low value even to hold one partition in 2 Phase Hash Agg scenario and
- * fallback mechanism to use unbounded memory is enabled then query completes successfully without spilling.
+ * Test with the "fallback" option ON: When not enough memory is available to allow spilling (internally need enough memory to
+ * create multiple partitions), then behave like the pre-1.11 Hash Aggregate: Allocate unlimited memory, no spill.
*
* @throws Exception
*/
@Test
public void testHashAggrSuccessWithFallbackEnabled() throws Exception {
- LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder()
- .toConsole()
- .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG)
- ;
- FixtureBuilder builder = ClusterFixture.builder()
- .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000)
- .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,4)
- .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3)
- .sessionOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY, true)
- .configProperty(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, false)
- .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true)
- .maxParallelization(2)
- .saveProfiles()
- ;
- try (LogFixture logs = logBuilder.build();
- ClusterFixture cluster = builder.build();
- ClientFixture client = cluster.clientFixture()) {
- String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i";
- runAndDump(client, sql, 1_200_000, 0, 0);
- } catch (Exception ex) {
- fail();
- }
+ testSpill(34_000_000, 4, 5, 2, true /* do fallback */,true, null,
+ 1_200_000, 0 /* no spill due to fallback to pre-1.11 */,0, 0
+ );
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
index d2ff805..e8db1e8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/managed/TestSortImpl.java
@@ -33,7 +33,6 @@ import org.apache.drill.exec.ops.OperExecContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.physical.impl.xsort.managed.SortImpl.SortResults;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.record.BatchSchema;
@@ -91,15 +90,8 @@ public class TestSortImpl extends DrillTest {
.setQueryId(queryId)
.build();
SortConfig sortConfig = new SortConfig(opContext.getConfig());
- DrillbitEndpoint ep = DrillbitEndpoint.newBuilder()
- .setAddress("foo.bar.com")
- .setUserPort(1234)
- .setControlPort(1235)
- .setDataPort(1236)
- .setVersion("1.11")
- .build();
- SpillSet spillSet = new SpillSet(opContext.getConfig(), handle,
- popConfig, ep);
+
+ SpillSet spillSet = new SpillSet(opContext.getConfig(), handle, popConfig);
PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
return new SortImpl(opContext, sortConfig, spilledRuns, outputBatch);
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
----------------------------------------------------------------------
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 4e17eda..e97493e 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -198,11 +198,11 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
private static String createErrorMsg(final BufferAllocator allocator, final int rounded, final int requested) {
if (rounded != requested) {
return String.format(
- "Unable to allocate buffer of size %d (rounded from %d) due to memory limit. Current allocation: %d",
- rounded, requested, allocator.getAllocatedMemory());
+ "Unable to allocate buffer of size %d (rounded from %d) due to memory limit (%d). Current allocation: %d",
+ rounded, requested, allocator.getLimit(), allocator.getAllocatedMemory());
} else {
- return String.format("Unable to allocate buffer of size %d due to memory limit. Current allocation: %d",
- rounded, allocator.getAllocatedMemory());
+ return String.format("Unable to allocate buffer of size %d due to memory limit (%d). Current allocation: %d",
+ rounded, allocator.getLimit(), allocator.getAllocatedMemory());
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/vector/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 4418212..21959f6 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -198,7 +198,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
try {
values.allocateNew(totalBytes, valueCount);
bits.allocateNew(valueCount);
- } catch(DrillRuntimeException e) {
+ } catch(RuntimeException e) {
clear();
throw e;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index 4527da8..e5432da 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -362,7 +362,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
try {
data = allocator.buffer(totalBytes);
offsetVector.allocateNew(valueCount + 1);
- } catch (DrillRuntimeException e) {
+ } catch (RuntimeException e) {
clear();
throw e;
}
[2/2] drill git commit: DRILL-5694: Handle OOM in HashAggr by spill
and retry, reserve memory, spinner
Posted by pr...@apache.org.
DRILL-5694: Handle OOM in HashAggr by spill and retry, reserve memory, spinner
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d77ab318
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d77ab318
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d77ab318
Branch: refs/heads/master
Commit: d77ab31836e5e5b88c124e0c7540bccd0544dedb
Parents: 4c99f0c
Author: Ben-Zvi <bb...@mapr.com>
Authored: Wed Sep 20 14:39:21 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Thu Sep 21 18:35:08 2017 -0700
----------------------------------------------------------------------
.../exceptions/RetryAfterSpillException.java | 26 ++
.../drill/common/exceptions/UserException.java | 36 ++
.../org/apache/drill/exec/ExecConstants.java | 12 +-
.../physical/impl/aggregate/HashAggBatch.java | 11 +-
.../impl/aggregate/HashAggTemplate.java | 369 +++++++++++++------
.../physical/impl/aggregate/HashAggregator.java | 6 +-
.../impl/aggregate/SpilledRecordbatch.java | 12 +-
.../physical/impl/common/ChainedHashTable.java | 1 +
.../exec/physical/impl/common/HashTable.java | 3 +-
.../physical/impl/common/HashTableTemplate.java | 129 ++++---
.../exec/physical/impl/join/HashJoinBatch.java | 6 +-
.../physical/impl/spill/RecordBatchSizer.java | 2 +-
.../exec/physical/impl/spill/SpillSet.java | 26 +-
.../impl/xsort/managed/ExternalSortBatch.java | 3 +-
.../server/options/SystemOptionManager.java | 1 +
.../src/main/resources/drill-module.conf | 29 +-
.../physical/impl/agg/TestHashAggrSpill.java | 163 ++++----
.../impl/xsort/managed/TestSortImpl.java | 12 +-
.../apache/drill/exec/memory/BaseAllocator.java | 8 +-
.../codegen/templates/NullableValueVectors.java | 2 +-
.../templates/VariableLengthVectors.java | 2 +-
21 files changed, 523 insertions(+), 336 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java b/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
new file mode 100644
index 0000000..8fd5a95
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/exceptions/RetryAfterSpillException.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.exceptions;
+
+/**
+ * A special exception to be caught by caller, who is supposed to free memory by spilling and try again
+ *
+ */
+public class RetryAfterSpillException extends Exception {
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
index dd4fd36..4ea97e5 100644
--- a/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
+++ b/common/src/main/java/org/apache/drill/common/exceptions/UserException.java
@@ -22,6 +22,12 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.slf4j.Logger;
+import java.io.File;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.lang.management.ManagementFactory;
+
+import static java.lang.Thread.sleep;
/**
* Base class for all user exception. The goal is to separate out common error conditions where we can give users
* useful feedback.
@@ -536,6 +542,36 @@ public class UserException extends DrillRuntimeException {
* @return user exception
*/
public UserException build(final Logger logger) {
+
+ // To allow for debugging:
+ //
+ // A spinner code to make the execution stop here while the file '/tmp/drill/spin' exists
+ // Can be used to attach a debugger, use jstack, etc
+ // (do "clush -a touch /tmp/drill/spin" to turn this on across all the cluster nodes, and to
+ // release the spinning threads do "clush -a rm /tmp/drill/spin")
+ // The processID of the spinning thread (along with the error message) should then be found
+ // in a file like /tmp/drill/spin4148663301172491613.tmp
+ File spinFile = new File("/tmp/drill/spin");
+ if ( spinFile.exists() ) {
+ File tmpDir = new File("/tmp/drill");
+ File outErr = null;
+ try {
+ outErr = File.createTempFile("spin", ".tmp", tmpDir);
+ BufferedWriter bw = new BufferedWriter(new FileWriter(outErr));
+ bw.write("Spinning process: " + ManagementFactory.getRuntimeMXBean().getName()
+ /* After upgrading to JDK 9 - replace with: ProcessHandle.current().getPid() */);
+ bw.write("\nError cause: " +
+ (errorType == DrillPBError.ErrorType.SYSTEM ? ("SYSTEM ERROR: " + ErrorHelper.getRootMessage(cause)) : message));
+ bw.close();
+ } catch (Exception ex) {
+ logger.warn("Failed creating a spinner tmp message file: {}", ex);
+ }
+ while (spinFile.exists()) {
+ try { sleep(1_000); } catch (Exception ex) { /* ignore interruptions */ }
+ }
+ try { outErr.delete(); } catch (Exception ex) { } // cleanup - remove err msg file
+ }
+
if (uex != null) {
return uex;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 4fa846f..2b32569 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -93,18 +93,20 @@ public interface ExecConstants {
// Hash Aggregate Options
- String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128); // 1 means - no spilling
- String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE);
// min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
// or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
// Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
- String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
- String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
- LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5);
+ String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "exec.hashagg.min_batches_per_partition";
+ LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 1, 5);
+ // Can be turned off mainly for testing. Memory prediction is used to decide on when to spill to disk; with this option off,
+ // spill would be triggered only by another mechanism -- "catch OOMs and then spill".
+ String HASHAGG_USE_MEMORY_PREDICTION_KEY = "exec.hashagg.use_memory_prediction";
+ BooleanValidator HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR = new BooleanValidator(HASHAGG_USE_MEMORY_PREDICTION_KEY);
+
String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";
String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled";
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 97e0599..314cf6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.IfExpression;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.logical.data.NamedExpression;
@@ -251,6 +252,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
groupByOutFieldIds[i] = container.add(vv);
}
+ int extraNonNullColumns = 0; // each of SUM, MAX and MIN gets an extra bigint column
for (i = 0; i < numAggrExprs; i++) {
NamedExpression ne = popConfig.getAggrExprs().get(i);
final LogicalExpression expr =
@@ -268,6 +270,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
continue;
}
+ if ( expr instanceof FunctionHolderExpression ) {
+ String funcName = ((FunctionHolderExpression) expr).getName();
+ if ( funcName.equals("sum") || funcName.equals("max") || funcName.equals("min") ) {extraNonNullColumns++;}
+ }
final MaterializedField outputField = MaterializedField.create(ne.getRef().getAsNamePart().getName(), expr.getMajorType());
@SuppressWarnings("resource")
ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
@@ -288,12 +294,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
- agg.setup(popConfig, htConfig, context, this.stats,
- oContext, incoming, this,
+ agg.setup(popConfig, htConfig, context, oContext, incoming, this,
aggrExprs,
cgInner.getWorkspaceTypes(),
groupByOutFieldIds,
- this.container);
+ this.container, extraNonNullColumns * 8 /* sizeof(BigInt) */);
return agg;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index a3b1ceb..1e65c49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import com.google.common.base.Stopwatch;
-
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
@@ -60,8 +60,6 @@ import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.planner.physical.AggPrelBase;
-import org.apache.drill.exec.proto.UserBitShared;
-
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
@@ -109,14 +107,21 @@ public abstract class HashAggTemplate implements HashAggregator {
private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
private boolean is2ndPhase = false;
- private boolean canSpill = true; // make it false in case can not spill
+ private boolean is1stPhase = false;
+ private boolean canSpill = true; // make it false in case can not spill/return-early
private ChainedHashTable baseHashTable;
private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
private int earlyPartition = 0; // which partition to return early
-
- private long memoryLimit; // max memory to be used by this oerator
- private long estMaxBatchSize = 0; // used for adjusting #partitions
- private long estRowWidth = 0;
+ private boolean retrySameIndex = false; // in case put failed during 1st phase - need to output early, then retry
+ private boolean useMemoryPrediction = false; // whether to use memory prediction to decide when to spill
+ private long estMaxBatchSize = 0; // used for adjusting #partitions and deciding when to spill
+ private long estRowWidth = 0; // the size of the internal "row" (keys + values + extra columns)
+ private long estValuesRowWidth = 0; // the size of the internal values ( values + extra )
+ private long estOutputRowWidth = 0; // the size of the output "row" (no extra columns)
+ private long estValuesBatchSize = 0; // used for "reserving" memory for the Values batch to overcome an OOM
+ private long estOutgoingAllocSize = 0; // used for "reserving" memory for the Outgoing Output Values to overcome an OOM
+ private long reserveValueBatchMemory; // keep "reserve memory" for Values Batch
+ private long reserveOutgoingMemory; // keep "reserve memory" for the Outgoing (Values only) output
private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars
private long minBatchesPerPartition; // for tuning - num partitions and spill decision
private long plannedBatches = 0; // account for planned, but not yet allocated batches
@@ -297,10 +302,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
@Override
- public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
- OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds,
- VectorContainer outContainer) throws SchemaChangeException, IOException {
+ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
@@ -310,25 +312,27 @@ public abstract class HashAggTemplate implements HashAggregator {
}
this.context = context;
- this.stats = stats;
+ this.stats = oContext.getStats();
this.allocator = oContext.getAllocator();
this.oContext = oContext;
this.incoming = incoming;
this.outgoing = outgoing;
this.outContainer = outContainer;
this.operatorId = hashAggrConfig.getOperatorId();
+ this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
+ is1stPhase = isTwoPhase && ! is2ndPhase ;
canSpill = isTwoPhase; // single phase can not spill
// Typically for testing - force a spill after a partition has more than so many batches
- minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION);
+ minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
// Set the memory limit
- memoryLimit = allocator.getLimit();
+ long memoryLimit = allocator.getLimit();
// Optional configured memory limit, typically used only for testing.
- long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY);
+ long configLimit = context.getOptions().getOption(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR);
if (configLimit > 0) {
logger.warn("Memory limit was changed to {}",configLimit);
memoryLimit = Math.min(memoryLimit, configLimit);
@@ -370,6 +374,10 @@ public abstract class HashAggTemplate implements HashAggregator {
this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill)
numGroupByOutFields = groupByOutFieldIds.length;
+ // Start calculating the row widths (with the extra columns; the rest would be done in updateEstMaxBatchSize() )
+ estRowWidth = extraRowBytes;
+ estValuesRowWidth = extraRowBytes;
+
doSetup(incoming);
}
@@ -382,19 +390,25 @@ public abstract class HashAggTemplate implements HashAggregator {
final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
// Set the number of partitions from the configuration (raise to a power of two, if needed)
- numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS);
- if ( numPartitions == 1 ) {
+ numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
+ if ( numPartitions == 1 && is2ndPhase ) { // 1st phase can still do early return with 1 partition
canSpill = false;
logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
}
numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
- if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch
+ if ( schema == null ) { estValuesBatchSize = estOutgoingAllocSize = estMaxBatchSize = 0; } // incoming was an empty batch
else {
// Estimate the max batch size; should use actual data (e.g. lengths of varchars)
updateEstMaxBatchSize(incoming);
}
- long memAvail = memoryLimit - allocator.getAllocatedMemory();
+ // create "reserved memory" and adjust the memory limit down
+ reserveValueBatchMemory = reserveOutgoingMemory = estValuesBatchSize ;
+ long newMemoryLimit = allocator.getLimit() - reserveValueBatchMemory - reserveOutgoingMemory ;
+ long memAvail = newMemoryLimit - allocator.getAllocatedMemory();
+ if ( memAvail <= 0 ) { throw new OutOfMemoryException("Too little memory available"); }
+ allocator.setLimit(newMemoryLimit);
+
if ( !canSpill ) { // single phase, or spill disabled by configuation
numPartitions = 1; // single phase should use only a single partition (to save memory)
} else { // two phase
@@ -464,6 +478,8 @@ public abstract class HashAggTemplate implements HashAggregator {
}
this.batchHolders[i] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
}
+ // Initialize the value vectors in the generated code (which point to the incoming or outgoing fields)
+ try { htables[0].updateBatches(); } catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); };
}
/**
* get new incoming: (when reading spilled files like an "incoming")
@@ -500,22 +516,45 @@ public abstract class HashAggTemplate implements HashAggregator {
*/
private void updateEstMaxBatchSize(RecordBatch incoming) {
if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change
+ // Use the sizer to get the input row width and the length of the longest varchar column
RecordBatchSizer sizer = new RecordBatchSizer(incoming);
logger.trace("Incoming sizer: {}",sizer);
// An empty batch only has the schema, can not tell actual length of varchars
// else use the actual varchars length, each capped at 50 (to match the space allocation)
- estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
- estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+ long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50();
// Get approx max (varchar) column width to get better memory allocation
- maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
+ maxColumnWidth = Math.max(sizer.maxAvgColumnSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE);
maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE);
- logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}",
- isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+ //
+ // Calculate the estimated max (internal) batch (i.e. Keys batch + Values batch) size
+ // (which is used to decide when to spill)
+ // Also calculate the values batch size (used as a reserve to overcome an OOM)
+ //
+ Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+ int fieldId = 0;
+ while (outgoingIter.hasNext()) {
+ ValueVector vv = outgoingIter.next().getValueVector();
+ MaterializedField mr = vv.getField();
+ int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
+ TypeHelper.getSize(mr.getType());
+ estRowWidth += fieldSize;
+ estOutputRowWidth += fieldSize;
+ if ( fieldId < numGroupByOutFields ) { fieldId++; }
+ else { estValuesRowWidth += fieldSize; }
+ }
+ // multiply by the max number of rows in a batch to get the final estimated max size
+ estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) * MAX_BATCH_SIZE;
+ // (When there are no aggr functions, use '1' as later code relies on this size being non-zero)
+ estValuesBatchSize = Math.max(estValuesRowWidth, 1) * MAX_BATCH_SIZE;
+ estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
+
+ logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {} memory limit: {} max column width: {}",
+ isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
- if ( estMaxBatchSize > memoryLimit ) {
- logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit);
+ if ( estMaxBatchSize > allocator.getLimit() ) {
+ logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,allocator.getLimit());
}
}
@@ -545,16 +584,19 @@ public abstract class HashAggTemplate implements HashAggregator {
if (EXTRA_DEBUG_1) {
logger.debug("Starting outer loop of doWork()...");
}
- for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
+ while (underlyingIndex < currentBatchRecordCount) {
if (EXTRA_DEBUG_2) {
logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex);
}
checkGroupAndAggrValues(currentIndex);
+
+ if ( retrySameIndex ) { retrySameIndex = false; } // need to retry this row (e.g. we had an OOM)
+ else { incIndex(); } // next time continue with the next incoming row
+
// If adding a group discovered a memory pressure during 1st phase, then start
// outputing some partition downstream in order to free memory.
if ( earlyOutput ) {
outputCurrentBatch();
- incIndex(); // next time continue with the next incoming row
return AggOutcome.RETURN_OUTCOME;
}
}
@@ -573,9 +615,8 @@ public abstract class HashAggTemplate implements HashAggregator {
// collecting stats on the spilled records)
//
long memAllocBeforeNext = allocator.getAllocatedMemory();
-
if ( handlingSpills ) {
- outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP;
+ outcome = incoming.next(); // get it from the SpilledRecordBatch
} else {
// Get the next RecordBatch from the incoming (i.e. upstream operator)
outcome = outgoing.next(0, incoming);
@@ -646,6 +687,46 @@ public abstract class HashAggTemplate implements HashAggregator {
}
/**
+ * Use reserved values memory (if available) to try and preemp an OOM
+ */
+ private void useReservedValuesMemory() {
+ // try to preempt an OOM by using the reserved memory
+ long reservedMemory = reserveValueBatchMemory;
+ if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
+
+ reserveValueBatchMemory = 0;
+ }
+ /**
+ * Use reserved outgoing output memory (if available) to try and preemp an OOM
+ */
+ private void useReservedOutgoingMemory() {
+ // try to preempt an OOM by using the reserved memory
+ long reservedMemory = reserveOutgoingMemory;
+ if ( reservedMemory > 0 ) { allocator.setLimit(allocator.getLimit() + reservedMemory); }
+
+ reserveOutgoingMemory = 0;
+ }
+ /**
+ * Restore the reserve memory (both)
+ *
+ */
+ private void restoreReservedMemory() {
+ if ( 0 == reserveOutgoingMemory ) { // always restore OutputValues first (needed for spilling)
+ long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
+ if ( memAvail > estOutgoingAllocSize) {
+ allocator.setLimit(allocator.getLimit() - estOutgoingAllocSize);
+ reserveOutgoingMemory = estOutgoingAllocSize;
+ }
+ }
+ if ( 0 == reserveValueBatchMemory ) {
+ long memAvail = allocator.getLimit() - allocator.getAllocatedMemory();
+ if ( memAvail > estValuesBatchSize) {
+ allocator.setLimit(allocator.getLimit() - estValuesBatchSize);
+ reserveValueBatchMemory = estValuesBatchSize;
+ }
+ }
+ }
+ /**
* Allocate space for the returned aggregate columns
* (Note DRILL-5588: Maybe can eliminate this allocation (and copy))
* @param records
@@ -657,12 +738,25 @@ public abstract class HashAggTemplate implements HashAggregator {
for (int i = 0; i < numGroupByOutFields; i++) {
outgoingIter.next();
}
+
+ // try to preempt an OOM by using the reserved memory
+ useReservedOutgoingMemory();
+ long allocatedBefore = allocator.getAllocatedMemory();
+
while (outgoingIter.hasNext()) {
@SuppressWarnings("resource")
ValueVector vv = outgoingIter.next().getValueVector();
AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0);
}
+
+ long memAdded = allocator.getAllocatedMemory() - allocatedBefore;
+ if ( memAdded > estOutgoingAllocSize ) {
+ logger.trace("Output values allocated {} but the estimate was only {}. Adjusting ...",memAdded,estOutgoingAllocSize);
+ estOutgoingAllocSize = memAdded;
+ }
+ // try to restore the reserve
+ restoreReservedMemory();
}
@Override
@@ -738,6 +832,9 @@ public abstract class HashAggTemplate implements HashAggregator {
batchHolders[part].clear();
}
batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received.
+
+ // in case the reserve memory was used, try to restore
+ restoreReservedMemory();
}
private final void incIndex() {
@@ -751,7 +848,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
private final void resetIndex() {
- underlyingIndex = -1;
+ underlyingIndex = -1; // will become 0 in incIndex()
incIndex();
}
@@ -768,10 +865,11 @@ public abstract class HashAggTemplate implements HashAggregator {
* Need to weigh the above three options.
*
* @param currPart - The partition that hit the memory limit (gets a priority)
- * @return The partition (number) chosen to be spilled
+ * @param tryAvoidCurr - When true, give negative priority to the current partition
+ * @return The partition (number) chosen to be spilled
*/
- private int chooseAPartitionToFlush(int currPart) {
- if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition
+ private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
+ if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition
int currPartSize = batchHolders[currPart].size();
if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
// first find the largest spilled partition
@@ -784,7 +882,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
// Give the current (if already spilled) some priority
- if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
+ if ( ! tryAvoidCurr && isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
maxSizeSpilled = currPartSize ;
indexMaxSpilled = currPart;
}
@@ -803,7 +901,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
// again - priority to the current partition
- if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
+ if ( ! tryAvoidCurr && ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
return currPart;
}
if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch!
@@ -898,7 +996,6 @@ public abstract class HashAggTemplate implements HashAggregator {
BatchHolder bh = newBatchHolder();
batchHolders[part].add(bh);
-
if (EXTRA_DEBUG_1) {
logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size());
}
@@ -1117,7 +1214,8 @@ public abstract class HashAggTemplate implements HashAggregator {
errmsg = "Too little memory available to operator to facilitate spilling.";
} else { // a bug ?
errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions +
- ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches;
+ ". Estimated batch size: " + estMaxBatchSize + ". values size: " + estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
+ if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + plannedBatches; }
if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
}
errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". ";
@@ -1129,10 +1227,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// The htIdxHolder contains the index of the group in the hash table container; this same
// index is also used for the aggregation values maintained by the hash aggregate.
private void checkGroupAndAggrValues(int incomingRowIdx) {
- if (incomingRowIdx < 0) {
- throw new IllegalArgumentException("Invalid incoming row index.");
- }
-
+ assert incomingRowIdx >= 0;
assert ! earlyOutput;
/** for debugging
@@ -1165,7 +1260,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// partition to use, and the higher bits determine the location in the hash table.
int hashCode;
try {
- htables[0].updateBatches();
+ // htables[0].updateBatches();
hashCode = htables[0].getHashCode(incomingRowIdx);
} catch (SchemaChangeException e) {
throw new UnsupportedOperationException("Unexpected schema change", e);
@@ -1179,19 +1274,45 @@ public abstract class HashAggTemplate implements HashAggregator {
HashTable.PutStatus putStatus = null;
long allocatedBeforeHTput = allocator.getAllocatedMemory();
+ // Proactive spill - in case there is no reserve memory - spill and retry putting later
+ if ( reserveValueBatchMemory == 0 && canSpill ) {
+ logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.",
+ is1stPhase ? "early return" : "spill");
+
+ doSpill(currentPartition); // spill to free some memory
+
+ retrySameIndex = true;
+ return; // to retry this put()
+ }
+
// ==========================================
// Insert the key columns into the hash table
// ==========================================
try {
+
putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode);
+
+ } catch (RetryAfterSpillException re) {
+ if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
+
+ logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.",
+ is1stPhase ? "early return" : "spill");
+
+ // for debugging - in case there's a leak
+ long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
+ if ( memDiff > 0 ) { logger.warn("Leak: HashTable put() OOM left behind {} bytes allocated",memDiff); }
+
+ doSpill(currentPartition); // spill to free some memory
+
+ retrySameIndex = true;
+ return; // to retry this put()
} catch (OutOfMemoryException exc) {
- throw new OutOfMemoryException(getOOMErrorMsg("HT was: " + allocatedBeforeHTput), exc); // may happen when can not spill
+ throw new OutOfMemoryException(getOOMErrorMsg("HT was: " + allocatedBeforeHTput), exc);
} catch (SchemaChangeException e) {
- throw new UnsupportedOperationException("Unexpected schema change", e);
+ throw new UnsupportedOperationException("Unexpected schema change", e);
}
-
- boolean needToCheckIfSpillIsNeeded = false;
long allocatedBeforeAggCol = allocator.getAllocatedMemory();
+ boolean needToCheckIfSpillIsNeeded = allocatedBeforeAggCol > allocatedBeforeHTput ;
// Add an Aggr batch if needed:
//
@@ -1201,32 +1322,38 @@ public abstract class HashAggTemplate implements HashAggregator {
if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) {
try {
- addBatchHolder(currentPartition);
+ useReservedValuesMemory(); // try to preempt an OOM by using the reserve
+
+ addBatchHolder(currentPartition); // allocate a new (internal) values batch
+
+ restoreReservedMemory(); // restore the reserve, if possible
+ // A reason to check for a spill - In case restore-reserve failed
+ needToCheckIfSpillIsNeeded = ( 0 == reserveValueBatchMemory );
if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
long totalAddedMem = allocator.getAllocatedMemory() - allocatedBeforeHTput;
- logger.trace("MEMORY CHECK AGG: allocated now {}, added {} total (with HT) added {}", allocator.getAllocatedMemory(),
- allocator.getAllocatedMemory() - allocatedBeforeAggCol, totalAddedMem);
- // resize the batch estimate if needed (e.g., varchars may take more memory than estimated)
+ long aggValuesAddedMem = allocator.getAllocatedMemory() - allocatedBeforeAggCol;
+ logger.trace("MEMORY CHECK AGG: allocated now {}, added {}, total (with HT) added {}", allocator.getAllocatedMemory(),
+ aggValuesAddedMem, totalAddedMem);
+ // resize the batch estimates if needed (e.g., varchars may take more memory than estimated)
if (totalAddedMem > estMaxBatchSize) {
logger.trace("Adjusting Batch size estimate from {} to {}", estMaxBatchSize, totalAddedMem);
estMaxBatchSize = totalAddedMem;
- needToCheckIfSpillIsNeeded = true; // better check the memory limits again now
+ needToCheckIfSpillIsNeeded = true;
+ }
+ if (aggValuesAddedMem > estValuesBatchSize) {
+ logger.trace("Adjusting Values Batch size from {} to {}",estValuesBatchSize, aggValuesAddedMem);
+ estValuesBatchSize = aggValuesAddedMem;
+ needToCheckIfSpillIsNeeded = true;
}
} catch (OutOfMemoryException exc) {
- throw new OutOfMemoryException(getOOMErrorMsg("AGGR"), exc); // may happen when can not spill
+ throw new OutOfMemoryException(getOOMErrorMsg("AGGR"), exc);
}
} else if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST ) {
// If a batch just became full (i.e. another batch would be allocated soon) -- then need to
// check (later, see below) if the memory limits are too close, and if so -- then spill !
plannedBatches++; // planning to allocate one more batch
needToCheckIfSpillIsNeeded = true;
- } else if ( allocatedBeforeAggCol > allocatedBeforeHTput ) {
- // if HT put() allocated memory (other than a new batch; e.g. HT doubling, or buffer resizing)
- // then better check again whether a spill is needed
- needToCheckIfSpillIsNeeded = true;
-
- logger.trace("MEMORY CHECK HT: was allocated {} added {} partition {}",allocatedBeforeHTput, allocatedBeforeAggCol - allocatedBeforeHTput,currentPartition);
}
// =================================================================
@@ -1240,67 +1367,83 @@ public abstract class HashAggTemplate implements HashAggregator {
}
// ===================================================================================
- // If the last batch just became full - that is the time to check the memory limits !!
- // If exceeded, then need to spill (if 2nd phase) or output early (1st)
- // (Skip this if cannot spill; in such case an OOM may be encountered later)
+ // If the last batch just became full, or other "memory growing" events happened, then
+ // this is the time to check the memory limits !!
+ // If the limits were exceeded, then need to spill (if 2nd phase) or output early (1st)
+ // (Skip this if cannot spill, or not checking memory limits; in such case an OOM may
+ // be encountered later - and some OOM cases are recoverable by spilling and retrying)
// ===================================================================================
- if ( needToCheckIfSpillIsNeeded && canSpill ) {
+ if ( needToCheckIfSpillIsNeeded && canSpill && useMemoryPrediction ) {
+ spillIfNeeded(currentPartition);
+ }
+ }
- // calculate the (max) new memory needed now
- // Plan ahead for at least MIN batches
- long maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) *
- ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) );
+ private void spillIfNeeded(int currentPartition) { spillIfNeeded(currentPartition, false);}
+ private void doSpill(int currentPartition) { spillIfNeeded(currentPartition, true);}
+ /**
+ * Spill (or return early, if 1st phase) if too little available memory is left
+ * @param currentPartition - the preferred candidate for spilling
+ * @param forceSpill -- spill unconditionally (no memory checks)
+ */
+ private void spillIfNeeded(int currentPartition, boolean forceSpill) {
+ long maxMemoryNeeded = 0;
+ if ( !forceSpill ) { // need to check the memory in order to decide
+ // calculate the (max) new memory needed now; plan ahead for at least MIN batches
+ maxMemoryNeeded = minBatchesPerPartition * Math.max(1, plannedBatches) * (estMaxBatchSize + MAX_BATCH_SIZE * (4 + 4 /* links + hash-values */));
// Add the (max) size of the current hash table, in case it will double
int maxSize = 1;
- for ( int insp = 0; insp < numPartitions; insp++) { maxSize = Math.max(maxSize, batchHolders[insp].size()); }
+ for (int insp = 0; insp < numPartitions; insp++) {
+ maxSize = Math.max(maxSize, batchHolders[insp].size());
+ }
maxMemoryNeeded += MAX_BATCH_SIZE * 2 * 2 * 4 * maxSize; // 2 - double, 2 - max when %50 full, 4 - Uint4
// log a detailed debug message explaining why a spill may be needed
- logger.debug("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " +
- "Max memory needed {}, Est batch size {}, mem limit {}",
- allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition,
- batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit);
- //
- // Spill if the allocated memory plus the memory needed exceeds the memory limit.
- //
- if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
-
- // Pick a "victim" partition to spill or return
- int victimPartition = chooseAPartitionToFlush(currentPartition);
-
- // In case no partition has more than one batch -- try and "push the limits"; maybe next
- // time the spill could work.
- if ( victimPartition < 0 ) { return; }
-
- if ( is2ndPhase ) {
- long before = allocator.getAllocatedMemory();
-
- spillAPartition(victimPartition);
- logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
-
- // Re-initialize (free memory, then recreate) the partition just spilled/returned
- reinitPartition(victimPartition);
-
- // in some "edge" cases (e.g. testing), spilling one partition may not be enough
- if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) {
- int victimPartition2 = chooseAPartitionToFlush(victimPartition);
- if ( victimPartition2 < 0 ) { return; }
- long after = allocator.getAllocatedMemory();
- spillAPartition(victimPartition2);
- reinitPartition(victimPartition2);
- logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
- before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
- logger.trace("Second Partition Spilled: {}",victimPartition2);
- }
+ logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + "Max memory needed {}, Est batch size {}, mem limit {}",
+ allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" : "1ST") : "Single", currentPartition, batchHolders[currentPartition].size(), maxMemoryNeeded,
+ estMaxBatchSize, allocator.getLimit());
+ }
+ //
+ // Spill if (forced, or) the allocated memory plus the memory needed exceed the memory limit.
+ //
+ if ( forceSpill || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit() ) {
+
+ // Pick a "victim" partition to spill or return
+ int victimPartition = chooseAPartitionToFlush(currentPartition, forceSpill);
+
+ // In case no partition has more than one batch -- try and "push the limits"; maybe next
+ // time the spill could work.
+ if ( victimPartition < 0 ) { return; }
+
+ if ( is2ndPhase ) {
+ long before = allocator.getAllocatedMemory();
+
+ spillAPartition(victimPartition);
+ logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition);
+
+ // Re-initialize (free memory, then recreate) the partition just spilled/returned
+ reinitPartition(victimPartition);
+
+ // In case spilling did not free enough memory to recover the reserves
+ boolean spillAgain = reserveOutgoingMemory == 0 || reserveValueBatchMemory == 0;
+ // in some "edge" cases (e.g. testing), spilling one partition may not be enough
+ if ( spillAgain || allocator.getAllocatedMemory() + maxMemoryNeeded > allocator.getLimit() ) {
+ int victimPartition2 = chooseAPartitionToFlush(victimPartition, true);
+ if ( victimPartition2 < 0 ) { return; }
+ long after = allocator.getAllocatedMemory();
+ spillAPartition(victimPartition2);
+ reinitPartition(victimPartition2);
+ logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}",
+ before, after, allocator.getAllocatedMemory(), maxMemoryNeeded);
+ logger.trace("Second Partition Spilled: {}",victimPartition2);
}
- else {
- // 1st phase need to return a partition early in order to free some memory
- earlyOutput = true;
- earlyPartition = victimPartition;
+ }
+ else {
+ // 1st phase need to return a partition early in order to free some memory
+ earlyOutput = true;
+ earlyPartition = victimPartition;
- if ( EXTRA_DEBUG_SPILL ) {
- logger.debug("picked partition {} for early output", victimPartition);
- }
+ if ( EXTRA_DEBUG_SPILL ) {
+ logger.debug("picked partition {} for early output", victimPartition);
}
}
}
@@ -1335,7 +1478,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
if ( rowsReturnedEarly > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early
- (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0));
+ (int) Math.round( rowsReturnedEarly * estOutputRowWidth / 1024.0D / 1024.0));
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 21d5a4a..16b5499 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.record.RecordBatch;
@@ -47,10 +46,7 @@ public interface HashAggregator {
// OK - batch returned, NONE - end of data, RESTART - call again
public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
- public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context,
- OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
- LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds,
- VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException;
+ public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
public abstract IterOutcome getOutcome();
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index b05353e..ac4b29d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -64,7 +64,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
try {
this.spillStream = this.spillSet.openForInput(spillFile);
- } catch (IOException e) { throw new RuntimeException(e);}
+ } catch (IOException e) {
+ throw UserException.resourceError(e).build(HashAggBatch.logger);
+ }
next(); // initialize the container
}
@@ -124,6 +126,8 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
@Override
public IterOutcome next() {
+ if ( ! context.shouldContinue() ) { return IterOutcome.STOP; }
+
if ( spilledBatches <= 0 ) { // no more batches to read in this partition
this.close();
return IterOutcome.NONE;
@@ -155,6 +159,9 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
return IterOutcome.OK;
}
+ /**
+ * Note: ignoring any IO errors (e.g. file not found)
+ */
@Override
public void close() {
container.clear();
@@ -167,9 +174,8 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
spillSet.delete(spillFile);
}
catch (IOException e) {
- throw new RuntimeException(e);
+ /* ignore */
} finally {
- spillSet.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 387dad1..db9622f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -141,6 +141,7 @@ public class ChainedHashTable {
// This code is called from generated code, so to step into this code,
// persist the code generated in HashAggBatch also.
// top.saveCodeForDebugging(true);
+ top.preferPlainJava(true); // use a subclass
ClassGenerator<HashTable> cg = top.getRoot();
ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 1e6570f..3749e3e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
public interface HashTable {
@@ -58,7 +59,7 @@ public interface HashTable {
public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
- public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException;
+ public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 336026c..7a086aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -25,6 +25,7 @@ import javax.inject.Named;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
+import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
@@ -39,6 +40,7 @@ import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
public abstract class HashTableTemplate implements HashTable {
@@ -131,8 +133,8 @@ public abstract class HashTableTemplate implements HashTable {
boolean success = false;
try {
for (VectorWrapper<?> w : htContainerOrig) {
- @SuppressWarnings("resource")
ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
+ htContainer.add(vv); // add to container before actual allocation (to allow clearing in case of an OOM)
// Capacity for "hashValues" and "links" vectors is BATCH_SIZE records. It is better to allocate space for
// "key" vectors to store as close to as BATCH_SIZE records. A new BatchHolder is created when either BATCH_SIZE
@@ -148,8 +150,6 @@ public abstract class HashTableTemplate implements HashTable {
} else {
vv.allocateNew();
}
-
- htContainer.add(vv);
}
links = allocMetadataVector(HashTable.BATCH_SIZE, EMPTY_SLOT);
@@ -158,19 +158,17 @@ public abstract class HashTableTemplate implements HashTable {
} finally {
if (!success) {
htContainer.clear();
- if (links != null) {
- links.clear();
- }
+ if (links != null) { links.clear();}
}
}
}
private void init(IntVector links, IntVector hashValues, int size) {
for (int i = 0; i < size; i++) {
- links.getMutator().setSafe(i, EMPTY_SLOT);
+ links.getMutator().set(i, EMPTY_SLOT);
}
for (int i = 0; i < size; i++) {
- hashValues.getMutator().setSafe(i, 0);
+ hashValues.getMutator().set(i, 0);
}
links.getMutator().setValueCount(size);
hashValues.getMutator().setValueCount(size);
@@ -215,6 +213,8 @@ public abstract class HashTableTemplate implements HashTable {
int currentIdxWithinBatch = currentIdx & BATCH_MASK;
setValue(incomingRowIdx, currentIdxWithinBatch);
+ // setValue may OOM when doubling of one of the VarChar Key Value Vectors
+ // This would be caught and retried later (setValue() is idempotent)
// the previous entry in this hash chain should now point to the entry in this currentIdx
if (lastEntryBatch != null) {
@@ -223,8 +223,8 @@ public abstract class HashTableTemplate implements HashTable {
// since this is the last entry in the hash chain, the links array at position currentIdx
// will point to a null (empty) slot
- links.getMutator().setSafe(currentIdxWithinBatch, EMPTY_SLOT);
- hashValues.getMutator().setSafe(currentIdxWithinBatch, hashValue);
+ links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
+ hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
@@ -235,7 +235,7 @@ public abstract class HashTableTemplate implements HashTable {
}
private void updateLinks(int lastEntryIdxWithinBatch, int currentIdx) {
- links.getMutator().setSafe(lastEntryIdxWithinBatch, currentIdx);
+ links.getMutator().set(lastEntryIdxWithinBatch, currentIdx);
}
private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) {
@@ -254,9 +254,9 @@ public abstract class HashTableTemplate implements HashTable {
int newStartIdx = newStartIndices.getAccessor().get(bucketIdx);
if (newStartIdx == EMPTY_SLOT) { // new bucket was empty
- newStartIndices.getMutator().setSafe(bucketIdx, entryIdx); // update the start index to point to entry
- newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
- newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
+ newStartIndices.getMutator().set(bucketIdx, entryIdx); // update the start index to point to entry
+ newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT);
+ newHashValues.getMutator().set(entryIdxWithinBatch, hash);
if (EXTRA_DEBUG) {
logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, " +
@@ -279,9 +279,9 @@ public abstract class HashTableTemplate implements HashTable {
}
if (bh == this && newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
- newLinks.getMutator().setSafe(idxWithinBatch, entryIdx);
- newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
- newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
+ newLinks.getMutator().set(idxWithinBatch, entryIdx);
+ newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT);
+ newHashValues.getMutator().set(entryIdxWithinBatch, hash);
if (EXTRA_DEBUG) {
logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, " +
@@ -293,10 +293,10 @@ public abstract class HashTableTemplate implements HashTable {
break;
} else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
- bh.links.getMutator().setSafe(idxWithinBatch, entryIdx); // update the link in the other batch
- newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this
+ bh.links.getMutator().set(idxWithinBatch, entryIdx); // update the link in the other batch
+ newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this
// batch to mark end of the hash chain
- newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
+ newHashValues.getMutator().set(entryIdxWithinBatch, hash);
if (EXTRA_DEBUG) {
logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, " +
@@ -402,8 +402,8 @@ public abstract class HashTableTemplate implements HashTable {
private void clear() {
htContainer.clear();
- links.clear();
- hashValues.clear();
+ if ( links != null ) { links.clear(); }
+ if ( hashValues != null ) { hashValues.clear(); }
}
// Only used for internal debugging. Get the value vector at a particular index from the htContainer.
@@ -566,6 +566,17 @@ public abstract class HashTableTemplate implements HashTable {
return rounded;
}
+ private void retryAfterOOM(boolean batchAdded) throws RetryAfterSpillException {
+ // If a batch was added then undo; otherwise when retrying this put() we'd miss a NEW_BATCH_ADDED
+ if ( batchAdded ) {
+ logger.trace("OOM - Removing index {} from the batch holders list",batchHolders.size() - 1);
+ BatchHolder bh = batchHolders.remove(batchHolders.size() - 1);
+ bh.clear();
+ }
+ freeIndex--;
+ throw new RetryAfterSpillException();
+ }
+
public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
return getHashBuild(incomingRowIdx);
}
@@ -583,7 +594,7 @@ public abstract class HashTableTemplate implements HashTable {
* @return Status - the key(s) was ADDED or was already PRESENT
*/
@Override
- public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException {
+ public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException {
int bucketIndex = getBucketIndex(hashCode, numBuckets());
int startIdx = startIndices.getAccessor().get(bucketIndex);
@@ -609,17 +620,42 @@ public abstract class HashTableTemplate implements HashTable {
// no match was found, so insert a new entry
currentIdx = freeIndex++;
- boolean addedBatch = addBatchIfNeeded(currentIdx);
+ boolean addedBatch = false;
+ try { // ADD A BATCH
+ addedBatch = addBatchIfNeeded(currentIdx);
+ } catch (OutOfMemoryException OOME) {
+ retryAfterOOM( currentIdx < batchHolders.size() * BATCH_SIZE );
+ }
+
+ try { // INSERT ENTRY
+ BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
+
+ bh.insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
+ numEntries++;
+ } catch (OutOfMemoryException OOME) { retryAfterOOM( addedBatch ); }
+
+ try { // RESIZE HT
+ /* Resize hash table if needed and transfer the metadata
+ * Resize only after inserting the current entry into the hash table
+ * Otherwise our calculated lastEntryBatch and lastEntryIdx
+ * becomes invalid after resize.
+ */
+ resizeAndRehashIfNeeded();
+ } catch (OutOfMemoryException OOME) {
+ numEntries--; // undo - insert entry
+ if (lastEntryBatch != null) { // undo last added link in chain (if any)
+ lastEntryBatch.updateLinks(lastEntryIdxWithinBatch, EMPTY_SLOT);
+ }
+ retryAfterOOM( addedBatch );
+ }
if (EXTRA_DEBUG) {
logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
}
- insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch);
-
// if there was no hash chain at this bucket, need to update the start index array
if (startIdx == EMPTY_SLOT) {
- startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx);
+ startIndices.getMutator().set(getBucketIndex(hashCode, numBuckets()), currentIdx);
}
htIdxHolder.value = currentIdx;
return addedBatch ? PutStatus.NEW_BATCH_ADDED :
@@ -628,21 +664,6 @@ public abstract class HashTableTemplate implements HashTable {
PutStatus.KEY_ADDED; // otherwise
}
- private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException {
-
- BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
-
- bh.insertEntry(incomingRowIdx, currentIdx, hashValue, lastEntryBatch, lastEntryIdx);
- numEntries++;
-
- /* Resize hash table if needed and transfer the metadata
- * Resize only after inserting the current entry into the hash table
- * Otherwise our calculated lastEntryBatch and lastEntryIdx
- * becomes invalid after resize.
- */
- resizeAndRehashIfNeeded();
- }
-
// Return -1 if key is not found in the hash table. Otherwise, return the global index of the key
@Override
public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException {
@@ -690,8 +711,6 @@ public abstract class HashTableTemplate implements HashTable {
return;
}
- long t0 = System.currentTimeMillis();
-
if (EXTRA_DEBUG) {
logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold);
}
@@ -704,13 +723,23 @@ public abstract class HashTableTemplate implements HashTable {
return;
}
- int newSize = 2 * tableSize;
+ int newTableSize = 2 * tableSize;
+ newTableSize = roundUpToPowerOf2(newTableSize);
- tableSize = roundUpToPowerOf2(newSize);
+ // if not enough memory available to allocate the new hash-table, plus the new links and
+ // the new hash-values (to replace the existing ones - inside rehash() ), then OOM
+ if ( 4 /* sizeof(int) */ * ( newTableSize + 2 * HashTable.BATCH_SIZE /* links + hashValues */)
+ >= allocator.getLimit() - allocator.getAllocatedMemory()) {
+ throw new OutOfMemoryException("Resize Hash Table");
+ }
+
+ tableSize = newTableSize;
if (tableSize > MAXIMUM_CAPACITY) {
tableSize = MAXIMUM_CAPACITY;
}
+ long t0 = System.currentTimeMillis();
+
// set the new threshold based on the new table size and load factor
threshold = (int) Math.ceil(tableSize * htConfig.getLoadFactor());
@@ -744,11 +773,7 @@ public abstract class HashTableTemplate implements HashTable {
*
*/
public void reset() {
- // long before = allocator.getAllocatedMemory();
this.clear(); // Clear all current batch holders and hash table (i.e. free their memory)
- // long after = allocator.getAllocatedMemory();
-
- // logger.debug("Reinit Hash Table: Memory before {} After {} Percent after: {}",before,after, (100 * after ) / before);
freeIndex = 0; // all batch holders are gone
// reallocate batch holders, and the hash table to the original size
@@ -759,7 +784,7 @@ public abstract class HashTableTemplate implements HashTable {
incomingBuild = newIncoming;
reset();
try {
- updateBatches(); // Needed ? (to update the new incoming?)
+ updateBatches(); // Needed to update the value vectors in the generated code with the new incoming
} catch (SchemaChangeException e) {
throw new IllegalStateException("Unexpected schema change", e);
} catch(IndexOutOfBoundsException ioob) {
@@ -777,7 +802,7 @@ public abstract class HashTableTemplate implements HashTable {
IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, allocator);
vector.allocateNew(size);
for (int i = 0; i < size; i++) {
- vector.getMutator().setSafe(i, initialValue);
+ vector.getMutator().set(i, initialValue);
}
vector.getMutator().setValueCount(size);
return vector;
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 8c899aa..481bea8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
@@ -358,8 +359,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// For every record in the build batch , hash the key columns
for (int i = 0; i < currentRecordCount; i++) {
int hashCode = hashTable.getHashCode(i);
- hashTable.put(i, htIndex, hashCode);
-
+ try {
+ hashTable.put(i, htIndex, hashCode);
+ } catch (RetryAfterSpillException RE) { throw new OutOfMemoryException("HT put");} // Hash Join can not retry yet
/* Use the global index returned by the hash table, to store
* the current record index and batch index. This will be used
* later when we probe and find a match.
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
index f76757f..3e021ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java
@@ -438,7 +438,7 @@ public class RecordBatchSizer {
public boolean hasSv2() { return hasSv2; }
public int avgDensity() { return avgDensity; }
public int netSize() { return netBatchSize; }
- public int maxSize() { return maxSize; }
+ public int maxAvgColumnSize() { return maxSize / rowCount; }
public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index a269954..9a6420a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.Sort;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.hadoop.conf.Configuration;
@@ -317,7 +316,8 @@ public class SpillSet {
@Override
public void deleteDir(String fragmentSpillDir) throws IOException {
- new File(baseDir, fragmentSpillDir).delete();
+ boolean deleted = new File(baseDir, fragmentSpillDir).delete();
+ if ( ! deleted ) { throw new IOException("Failed to delete: " + fragmentSpillDir);}
}
@Override
@@ -359,17 +359,10 @@ public class SpillSet {
private long writeBytes;
public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
- this(context.getConfig(), context.getHandle(), popConfig,
- // Endpoint appears to be null in some tests.
- context.getDrillbitContext() == null ? null :
- context.getDrillbitContext().getEndpoint());
+ this(context.getConfig(), context.getHandle(), popConfig);
}
- public SpillSet(FragmentContext context, PhysicalOperator popConfig, DrillbitEndpoint ep) {
- this(context.getConfig(), context.getHandle(), popConfig, ep);
- }
-
- public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig, DrillbitEndpoint ep) {
+ public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popConfig) {
String operName;
// Set the spill options from the configuration
@@ -421,15 +414,7 @@ public class SpillSet {
fileManager = new HadoopFileManager(spillFs);
}
- // If provided with a prefix to identify the Drillbit, prepend that to the
- // spill directory.
-
- String nodeDir = "";
- if (ep != null && ep.getAddress() != null) {
- nodeDir = ep.getAddress() + "-" + ep.getUserPort() + "_";
- }
- spillDirName = String.format("%s%s_%s_%s-%s-%s",
- nodeDir,
+ spillDirName = String.format("%s_%s_%s-%s-%s",
QueryIdHelper.getQueryId(handle.getQueryId()),
operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
}
@@ -491,6 +476,7 @@ public class SpillSet {
// since this is meant to be used in a batches's cleanup, we don't propagate the exception
logger.warn("Unable to delete spill directory " + path, e);
}
+ currSpillDirs.clear(); // in case close() is called again
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index ea1d605..6a97c29 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -217,8 +217,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
this.incoming = incoming;
SortConfig sortConfig = new SortConfig(context.getConfig());
- SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(),
- popConfig, context.getIdentity());
+ SpillSet spillSet = new SpillSet(context.getConfig(), context.getHandle(), popConfig);
OperExecContext opContext = new OperExecContextImpl(context, oContext, popConfig, injector);
PriorityQueueCopierWrapper copierHolder = new PriorityQueueCopierWrapper(opContext);
SpilledRuns spilledRuns = new SpilledRuns(opContext, spillSet, copierHolder);
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 75bcc1f..d1d56a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -118,6 +118,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR),
new OptionDefinition(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR), // for tuning
+ new OptionDefinition(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR), // for testing
new OptionDefinition(ExecConstants.HASHAGG_FALLBACK_ENABLED_VALIDATOR), // for enable/disable unbounded HashAgg
new OptionDefinition(ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION),
new OptionDefinition(ExecConstants.OUTPUT_FORMAT_VALIDATOR),
http://git-wip-us.apache.org/repos/asf/drill/blob/d77ab318/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4e7bbdb..0b8d605 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -229,22 +229,14 @@ drill.exec: {
directories: [ "/tmp/drill/spill" ]
},
hashagg: {
- // An internal tuning; should not be changed
- min_batches_per_partition: 3,
- // An option for testing - force a memory limit
- mem_limit: 0,
- // The max number of partitions in each hashagg operator
- // This number is tuned down when memory is limited
- // Setting it to 1 means: No spilling
- num_partitions: 32,
spill: {
- // -- The 2 options below can be used to override the common ones
- // -- (common to all spilling operators)
- // File system to use. Local file system by default.
- fs: ${drill.exec.spill.fs},
- // List of directories to use. Directories are created
- // if they do not exist.
- directories: ${drill.exec.spill.directories},
+ // -- The 2 options below can be used to override the common ones
+ // -- (common to all spilling operators)
+ // File system to use. Local file system by default.
+ fs: ${drill.exec.spill.fs},
+ // List of directories to use. Directories are created
+ // if they do not exist.
+ directories: ${drill.exec.spill.directories},
}
},
sort: {
@@ -370,7 +362,6 @@ drill.exec.options: {
debug.validate_iterators : false,
debug.validate_vectors :false,
drill.exec.functions.cast_empty_string_to_null: false,
- drill.exec.hashagg.min_batches_per_partition : 3,
# Setting to control if HashAgg should fallback to older behavior of consuming
# unbounded memory. In case of 2 phase Agg when available memory is not enough
# to start at least 2 partitions then HashAgg fallbacks to this case. It can be
@@ -388,8 +379,10 @@ drill.exec.options: {
exec.enable_bulk_load_table_list: false,
exec.enable_union_type: false,
exec.errors.verbose: false,
- exec.hashagg.mem_limit : 0,
- exec.hashagg.num_partitions :32,
+ exec.hashagg.mem_limit: 0,
+ exec.hashagg.min_batches_per_partition: 2,
+ exec.hashagg.num_partitions: 32,
+ exec.hashagg.use_memory_prediction: true,
exec.impersonation.inbound_policies: "[]",
exec.java.compiler.exp_in_method_size: 50,
exec.java_compiler : "DEFAULT",