You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:36 UTC
[13/13] drill git commit: DRILL-4134: Allocator Improvements
DRILL-4134: Allocator Improvements
- make Allocator mostly lockless
- change BaseAllocator maps to direct references
- add documentation around memory management model
- move transfer and ownership methods to DrillBuf
- Improve debug messaging.
- Fix/revert sort changes
- Remove unused fragment limit flag
- Add time to HistoricalLog events
- Remove reservation amount from RootAllocator constructor (since not allowed)
- Fix concurrency issue where allocator is closing at same moment as incoming batch transfer, causing leaked memory and/or query failure.
- Add new AutoCloseables.close(Iterable<AutoCloseable>)
- Remove extraneous DataResponseHandler and Impl (and update TestBitRpc to use smarter mock of FragmentManager)
- Remove the concept of poison pill record batches, using instead FragmentContext.isOverMemoryLimit()
- Update incoming data batches so that they are transferred under protection of a close lock
- Improve field names in IncomingBuffers and move synchronization to collectors as opposed to IncomingBuffers (also change decrementing to decrementToZero rather than two part check).
This closes #238.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/809f4620
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/809f4620
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/809f4620
Branch: refs/heads/master
Commit: 809f4620d7d82c72240212de13b993049550959d
Parents: 53dcabe
Author: Jacques Nadeau <ja...@apache.org>
Authored: Sun Nov 15 17:26:02 2015 -0800
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Mon Dec 21 23:52:32 2015 -0800
----------------------------------------------------------------------
.../org/apache/drill/common/AutoCloseables.java | 2 +-
.../org/apache/drill/common/HistoricalLog.java | 68 +-
.../org/apache/drill/common/StackTrace.java | 9 +-
.../common/concurrent/AutoCloseableLock.java | 43 +
.../hive/HiveDrillNativeScanBatchCreator.java | 8 +-
distribution/pom.xml | 5 -
distribution/src/assemble/bin.xml | 1 -
exec/java-exec/pom.xml | 2 +-
.../expr/fn/impl/StringFunctionHelpers.java | 4 +-
.../apache/drill/exec/expr/fn/impl/XXHash.java | 4 +-
.../drill/exec/memory/RootAllocatorFactory.java | 40 +
.../drill/exec/ops/BufferManagerImpl.java | 3 +-
.../apache/drill/exec/ops/FragmentContext.java | 63 +-
.../drill/exec/ops/OperatorContextImpl.java | 17 +-
.../org/apache/drill/exec/ops/QueryContext.java | 14 +-
.../drill/exec/physical/impl/BaseRootExec.java | 2 +-
.../drill/exec/physical/impl/ScanBatch.java | 2 +-
.../exec/physical/impl/SingleSenderCreator.java | 3 +-
.../BroadcastSenderRootExec.java | 3 +-
.../impl/mergereceiver/MergingRecordBatch.java | 5 +-
.../PartitionSenderRootExec.java | 3 +-
.../impl/sort/SortRecordBatchBuilder.java | 49 +-
.../physical/impl/trace/TraceRecordBatch.java | 7 +-
.../impl/union/UnionAllRecordBatch.java | 15 +-
.../UnorderedReceiverBatch.java | 7 +-
.../physical/impl/xsort/ExternalSortBatch.java | 51 +-
.../impl/xsort/PriorityQueueCopier.java | 3 +-
.../impl/xsort/PriorityQueueCopierTemplate.java | 8 +-
.../drill/exec/record/AbstractRecordBatch.java | 4 +-
.../drill/exec/record/RecordBatchLoader.java | 6 +-
.../apache/drill/exec/record/WritableBatch.java | 5 +-
.../drill/exec/rpc/control/ControlClient.java | 2 +-
.../drill/exec/rpc/control/ControlServer.java | 2 +-
.../apache/drill/exec/rpc/data/DataClient.java | 2 +-
.../exec/rpc/data/DataConnectionCreator.java | 14 +-
.../exec/rpc/data/DataResponseHandler.java | 34 -
.../exec/rpc/data/DataResponseHandlerImpl.java | 58 -
.../apache/drill/exec/rpc/data/DataServer.java | 109 +-
.../drill/exec/rpc/data/IncomingDataBatch.java | 73 +
.../apache/drill/exec/rpc/user/UserServer.java | 4 +-
.../org/apache/drill/exec/server/Drillbit.java | 2 +-
.../drill/exec/service/ServiceEngine.java | 6 +-
.../exec/store/dfs/easy/EasyFormatPlugin.java | 10 +-
.../exec/store/parquet/ParquetRecordWriter.java | 2 +-
.../store/parquet/ParquetScanBatchCreator.java | 6 +-
.../drill/exec/store/sys/MemoryIterator.java | 4 +-
.../org/apache/drill/exec/work/WorkManager.java | 8 -
.../exec/work/batch/AbstractDataCollector.java | 15 +-
.../exec/work/batch/BaseRawBatchBuffer.java | 28 +-
.../drill/exec/work/batch/IncomingBuffers.java | 82 +-
.../exec/work/batch/SpoolingRawBatchBuffer.java | 12 +-
.../work/batch/UnlimitedRawBatchBuffer.java | 2 +-
.../apache/drill/exec/work/foreman/Foreman.java | 7 +-
.../exec/work/fragment/FragmentManager.java | 4 +-
.../work/fragment/NonRootFragmentManager.java | 4 +-
.../exec/work/fragment/RootFragmentManager.java | 4 +-
.../java/org/apache/drill/BaseTestQuery.java | 13 +-
.../apache/drill/TestAllocationException.java | 71 -
.../drill/exec/memory/TestAllocators.java | 273 ++-
.../drill/exec/memory/TestBaseAllocator.java | 651 -------
.../exec/physical/impl/TestOptiqPlans.java | 3 +-
.../exec/record/vector/TestValueVector.java | 23 +-
.../apache/drill/exec/server/TestBitRpc.java | 84 +-
.../testing/TestCountDownLatchInjection.java | 13 +-
.../exec/testing/TestExceptionInjection.java | 21 +-
.../drill/exec/testing/TestPauseInjection.java | 17 +-
.../drill/exec/testing/TestResourceLeak.java | 18 +-
.../complex/writer/TestPromotableWriter.java | 2 -
exec/memory/base/pom.xml | 10 +
.../src/main/java/io/netty/buffer/DrillBuf.java | 714 ++------
.../java/io/netty/buffer/ExpandableByteBuf.java | 55 +
.../java/io/netty/buffer/FakeAllocator.java | 167 --
.../main/java/io/netty/buffer/LargeBuffer.java | 297 +---
.../io/netty/buffer/MutableWrappedByteBuf.java | 336 ++++
.../netty/buffer/PooledByteBufAllocatorL.java | 312 ++--
.../netty/buffer/UnsafeDirectLittleEndian.java | 316 ++--
.../apache/drill/exec/memory/Accountant.java | 274 +++
.../org/apache/drill/exec/memory/Accountor.java | 43 -
.../exec/memory/AllocationReservation.java | 86 +
.../exec/memory/AllocatorClosedException.java | 31 +
.../drill/exec/memory/AllocatorManager.java | 356 ++++
.../apache/drill/exec/memory/BaseAllocator.java | 739 ++++++++
.../drill/exec/memory/BoundsChecking.java | 1 +
.../drill/exec/memory/BufferAllocator.java | 155 +-
.../drill/exec/memory/ChildAllocator.java | 53 +
.../exec/memory/DrillByteBufAllocator.java | 141 ++
.../apache/drill/exec/memory/LimitConsumer.java | 28 -
.../java/org/apache/drill/exec/memory/README.md | 121 ++
.../apache/drill/exec/memory/RootAllocator.java | 39 +
.../apache/drill/exec/memory/package-info.java | 24 +
.../base/src/main/resources/drill-module.conf | 25 +
.../drill/exec/memory/TestAccountant.java | 163 ++
.../drill/exec/memory/TestBaseAllocator.java | 645 +++++++
.../apache/drill/exec/memory/TestEndianess.java | 43 +
exec/memory/impl/pom.xml | 68 -
.../apache/drill/exec/memory/AccountorImpl.java | 437 -----
.../drill/exec/memory/AllocationPolicy.java | 38 -
.../exec/memory/AllocationPolicyAgent.java | 69 -
.../exec/memory/AllocationReservation.java | 152 --
.../exec/memory/AllocatorClosedException.java | 31 -
.../drill/exec/memory/AllocatorOwner.java | 40 -
.../exec/memory/AllocatorsStatsMXBean.java | 34 -
.../drill/exec/memory/AtomicRemainder.java | 215 ---
.../apache/drill/exec/memory/BaseAllocator.java | 1654 ------------------
.../apache/drill/exec/memory/BufferLedger.java | 94 -
.../exec/memory/ChainedAllocatorOwner.java | 60 -
.../drill/exec/memory/ChildAllocator.java | 47 -
.../apache/drill/exec/memory/RootAllocator.java | 125 --
.../drill/exec/memory/RootAllocatorFactory.java | 43 -
.../drill/exec/memory/TopLevelAllocator.java | 408 -----
.../impl/src/main/resources/drill-module.conf | 25 -
.../apache/drill/exec/memory/TestEndianess.java | 43 -
exec/memory/pom.xml | 1 -
.../exec/rpc/BasicClientWithConnection.java | 2 +-
.../exec/expr/fn/impl/ByteFunctionHelpers.java | 8 +-
.../apache/drill/exec/util/DecimalUtility.java | 23 +-
.../org/apache/drill/exec/proto/BitData.java | 94 +-
.../apache/drill/exec/proto/SchemaBitData.java | 7 -
.../exec/proto/beans/FragmentRecordBatch.java | 23 -
protocol/src/main/protobuf/BitData.proto | 1 -
120 files changed, 4394 insertions(+), 6661 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/AutoCloseables.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/AutoCloseables.java b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
index 43cbbf5..fcdfe14 100644
--- a/common/src/main/java/org/apache/drill/common/AutoCloseables.java
+++ b/common/src/main/java/org/apache/drill/common/AutoCloseables.java
@@ -68,7 +68,7 @@ public class AutoCloseables {
* Closes all autoCloseables if not null and suppresses subsequent exceptions if more than one
* @param autoCloseables the closeables to close
*/
- public static void close(Collection<? extends AutoCloseable> ac) throws Exception {
+ public static void close(Iterable<? extends AutoCloseable> ac) throws Exception {
Exception topLevelException = null;
for (AutoCloseable closeable : ac) {
try {
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/HistoricalLog.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/HistoricalLog.java b/common/src/main/java/org/apache/drill/common/HistoricalLog.java
index 49b7c33..3331215 100644
--- a/common/src/main/java/org/apache/drill/common/HistoricalLog.java
+++ b/common/src/main/java/org/apache/drill/common/HistoricalLog.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.common;
+import java.util.Arrays;
import java.util.LinkedList;
import org.slf4j.Logger;
@@ -30,9 +31,11 @@ public class HistoricalLog {
private static class Event {
private final String note; // the event text
private final StackTrace stackTrace; // where the event occurred
+ private final long time;
public Event(final String note) {
this.note = note;
+ this.time = System.nanoTime();
stackTrace = new StackTrace();
}
}
@@ -106,8 +109,8 @@ public class HistoricalLog {
*
* @param sb {@link StringBuilder} to write to
*/
- public void buildHistory(final StringBuilder sb) {
- buildHistory(sb, null);
+ public void buildHistory(final StringBuilder sb, boolean includeStackTrace) {
+ buildHistory(sb, 0, includeStackTrace);
}
/**
@@ -119,34 +122,50 @@ public class HistoricalLog {
* @param additional an extra string that will be written between the identifying
* information and the history; often used for a current piece of state
*/
- public synchronized void buildHistory(final StringBuilder sb, final CharSequence additional) {
- sb.append('\n')
- .append(idString);
- if (additional != null) {
- sb.append('\n')
- .append(additional)
- .append('\n');
- }
+ /**
+ *
+ * @param sb
+ * @param indexLevel
+ * @param includeStackTrace
+ */
+ public synchronized void buildHistory(final StringBuilder sb, int indent, boolean includeStackTrace) {
+ final char[] indentation = new char[indent];
+ final char[] innerIndentation = new char[indent + 2];
+ Arrays.fill(indentation, ' ');
+ Arrays.fill(innerIndentation, ' ');
+
+ sb.append(indentation)
+ .append("event log for: ")
+ .append(idString)
+ .append('\n');
- sb.append(" event log\n");
if (firstEvent != null) {
- sb.append(" ")
+ sb.append(innerIndentation)
+ .append(firstEvent.time)
+ .append(' ')
.append(firstEvent.note)
.append('\n');
- firstEvent.stackTrace.writeToBuilder(sb, 4);
+ if (includeStackTrace) {
+ firstEvent.stackTrace.writeToBuilder(sb, indent + 2);
+ }
for(final Event event : history) {
if (event == firstEvent) {
continue;
}
-
- sb.append(" ")
+ sb.append(innerIndentation)
+ .append(" ")
+ .append(event.time)
+ .append(' ')
.append(event.note)
.append('\n');
- event.stackTrace.writeToBuilder(sb, 4);
+ if (includeStackTrace) {
+ event.stackTrace.writeToBuilder(sb, indent + 2);
+ sb.append('\n');
+ }
}
}
}
@@ -157,23 +176,10 @@ public class HistoricalLog {
* events with their stack traces.
*
* @param logger {@link Logger} to write to
- * @param additional an extra string that will be written between the identifying
- * information and the history; often used for a current piece of state
*/
- public void logHistory(final Logger logger, final CharSequence additional) {
+ public void logHistory(final Logger logger) {
final StringBuilder sb = new StringBuilder();
- buildHistory(sb, additional);
+ buildHistory(sb, 0, true);
logger.debug(sb.toString());
}
-
- /**
- * Write the history of this object to the given {@link Logger}. The history
- * includes the identifying string provided at construction time, and all the recorded
- * events with their stack traces.
- *
- * @param logger {@link Logger} to write to
- */
- public void logHistory(final Logger logger) {
- logHistory(logger, null);
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/StackTrace.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/StackTrace.java b/common/src/main/java/org/apache/drill/common/StackTrace.java
index de64ed7..1dbffe6 100644
--- a/common/src/main/java/org/apache/drill/common/StackTrace.java
+++ b/common/src/main/java/org/apache/drill/common/StackTrace.java
@@ -36,13 +36,14 @@ public class StackTrace {
/**
* Write the stack trace to a StringBuilder.
- *
- * @param sb where to write it
- * @param indent how many spaces to indent each line
+ * @param sb
+ * where to write it
+ * @param indent
+ * how many double spaces to indent each line
*/
public void writeToBuilder(final StringBuilder sb, final int indent) {
// create the indentation string
- final char[] indentation = new char[indent];
+ final char[] indentation = new char[indent * 2];
Arrays.fill(indentation, ' ');
// write the stack trace in standard Java format
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
new file mode 100644
index 0000000..91d50b4
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/concurrent/AutoCloseableLock.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.concurrent;
+
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Simple wrapper class that allows Locks to be released via an try-with-resources block.
+ */
+public class AutoCloseableLock implements AutoCloseable {
+
+ private final Lock lock;
+
+ public AutoCloseableLock(Lock lock) {
+ this.lock = lock;
+ }
+
+ public AutoCloseableLock open() {
+ lock.lock();
+ return this;
+ }
+
+ @Override
+ public void close() {
+ lock.unlock();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
index 1a76136..9f53971 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeScanBatchCreator.java
@@ -24,8 +24,6 @@ import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -55,6 +53,9 @@ import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
@SuppressWarnings("unused")
public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
@@ -94,8 +95,7 @@ public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNa
}
}
- final OperatorContext oContext = context.newOperatorContext(config,
- false /* ScanBatch is not subject to fragment memory limit */);
+ final OperatorContext oContext = context.newOperatorContext(config);
int currentPartitionIndex = 0;
final List<RecordReader> readers = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index f8aced5..5aaf09d 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -45,11 +45,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.drill.memory</groupId>
- <artifactId>drill-memory-impl</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.drill.exec</groupId>
<artifactId>drill-rpc</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml
index f8ada9b..449ac6c 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -89,7 +89,6 @@
<include>org.apache.drill:drill-logical:jar</include>
<include>org.apache.drill.exec:vector:jar</include>
<include>org.apache.drill.memory:drill-memory-base:jar</include>
- <include>org.apache.drill.memory:drill-memory-impl:jar</include>
<include>org.apache.drill.exec:drill-rpc:jar</include>
<include>org.apache.drill.exec:drill-java-exec:jar</include>
<include>org.apache.drill.contrib.storage-hive:drill-storage-hive-core</include>
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 3cd2585..efb549c 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -289,7 +289,7 @@
</dependency>
<dependency>
<groupId>org.apache.drill.memory</groupId>
- <artifactId>drill-memory-impl</artifactId>
+ <artifactId>drill-memory-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
index c919f28..3d0f170 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctionHelpers.java
@@ -30,8 +30,6 @@ import com.google.common.base.Charsets;
public class StringFunctionHelpers {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringFunctionHelpers.class);
- private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
-
static final int RADIX = 10;
static final long MAX_LONG = -Long.MAX_VALUE / RADIX;
static final int MAX_INT = -Integer.MAX_VALUE / RADIX;
@@ -212,7 +210,7 @@ public class StringFunctionHelpers {
private static final ISOChronology CHRONOLOGY = org.joda.time.chrono.ISOChronology.getInstanceUTC();
public static long getDate(DrillBuf buf, int start, int end){
- if(BOUNDS_CHECKING_ENABLED){
+ if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
buf.checkBytes(start, end);
}
return memGetDate(buf.memoryAddress(), start, end);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
index 033e2cb..0124b2f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/XXHash.java
@@ -27,8 +27,6 @@ import com.google.common.primitives.UnsignedLongs;
public final class XXHash {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(XXHash.class);
- private static final boolean BOUNDS_CHECKING_ENABLED = BoundsChecking.BOUNDS_CHECKING_ENABLED;
-
static final long PRIME64_1 = UnsignedLongs.decode("11400714785074694791");
static final long PRIME64_2 = UnsignedLongs.decode("14029467366897019727");
static final long PRIME64_3 = UnsignedLongs.decode("1609587929392839161");
@@ -168,7 +166,7 @@ public final class XXHash {
}
public static long hash64(int start, int end, DrillBuf buffer, long seed){
- if(BOUNDS_CHECKING_ENABLED){
+ if (BoundsChecking.BOUNDS_CHECKING_ENABLED) {
buffer.checkBytes(start, end);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
new file mode 100644
index 0000000..4fad668
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/RootAllocatorFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.memory;
+
+import org.apache.drill.common.config.DrillConfig;
+
+public class RootAllocatorFactory {
+
+ public static final String TOP_LEVEL_MAX_ALLOC = "drill.memory.top.max";
+
+ /**
+ * Constructor to prevent instantiation of this static utility class.
+ */
+ private RootAllocatorFactory() {}
+
+ /**
+ * Create a new Root Allocator
+ * @param drillConfig
+ * the DrillConfig
+ * @return a new root allocator
+ */
+ public static BufferAllocator newRoot(final DrillConfig drillConfig) {
+ return new RootAllocator(Math.min(DrillConfig.getMaxDirectMemory(), drillConfig.getLong(TOP_LEVEL_MAX_ALLOC)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index 003fc9b..b8c6796 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -55,9 +55,8 @@ public class BufferManagerImpl implements BufferManager {
}
public DrillBuf getManagedBuffer(int size) {
- DrillBuf newBuf = allocator.buffer(size);
+ DrillBuf newBuf = allocator.buffer(size, this);
managedBuffers.put(newBuf.memoryAddress(), newBuf);
- newBuf.setBufferManager(this);
return newBuf;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index a773c22..304ecf1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.LimitConsumer;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -159,8 +158,10 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
// Add the fragment context to the root allocator.
// The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
try {
- allocator = context.getAllocator().getChildAllocator(new AsLimitConsumer(), fragment.getMemInitial(),
- fragment.getMemMax(), true);
+ allocator = context.getAllocator().newChildAllocator(
+ "frag:" + QueryIdHelper.getFragmentId(fragment.getHandle()),
+ fragment.getMemInitial(),
+ fragment.getMemMax());
Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
} catch (final OutOfMemoryException e) {
throw UserException.memoryError(e)
@@ -174,31 +175,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
bufferManager = new BufferManagerImpl(this.allocator);
}
- private class AsLimitConsumer implements LimitConsumer {
- final String identifier = QueryIdHelper.getFragmentId(fragment.getHandle());
-
- @Override
- public String getIdentifier() {
- return identifier;
- }
-
- @Override
- public long getAllocated() {
- return allocator.getAllocatedMemory();
- }
-
- @Override
- public long getLimit() {
- return allocator.getLimit();
- }
-
- @Override
- public void setLimit(long limit) {
- allocator.setLimit(limit);
- }
-
- }
-
/**
* TODO: Remove this constructor when removing the SimpleRootExec (DRILL-2097). This is kept only to avoid modifying
* the long list of test files.
@@ -297,11 +273,6 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return frag;
}
- public LimitConsumer asLimitConsumer() {
- return new AsLimitConsumer();
- }
-
-
/**
* Get this fragment's allocator.
* @return the allocator
@@ -314,11 +285,19 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return allocator;
}
- public BufferAllocator getNewChildAllocator(final long initialReservation,
- final long maximumReservation,
- final boolean applyFragmentLimit) throws OutOfMemoryException {
- return allocator.getChildAllocator(new AsLimitConsumer(), initialReservation, maximumReservation,
- applyFragmentLimit);
+ public BufferAllocator getNewChildAllocator(final String operatorName,
+ final int operatorId,
+ final long initialReservation,
+ final long maximumReservation) throws OutOfMemoryException {
+ return allocator.newChildAllocator(
+ "op:" + QueryIdHelper.getFragmentId(fragment.getHandle()) + ":" + operatorId + ":" + operatorName,
+ initialReservation,
+ maximumReservation
+ );
+ }
+
+ public boolean isOverMemoryLimit() {
+ return allocator.isOverLimit();
}
public <T> T getImplementationClass(final ClassGenerator<T> cg)
@@ -361,16 +340,16 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return buffers;
}
- public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats, boolean applyFragmentLimit)
+ public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats)
throws OutOfMemoryException {
- OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats, applyFragmentLimit);
+ OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats);
contexts.add(context);
return context;
}
- public OperatorContext newOperatorContext(PhysicalOperator popConfig, boolean applyFragmentLimit)
+ public OperatorContext newOperatorContext(PhysicalOperator popConfig)
throws OutOfMemoryException {
- OperatorContextImpl context = new OperatorContextImpl(popConfig, this, applyFragmentLimit);
+ OperatorContextImpl context = new OperatorContextImpl(popConfig, this);
contexts.add(context);
return context;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index a3ec6bd..8217afd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -48,7 +48,6 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
private final PhysicalOperator popConfig;
private final OperatorStats stats;
private final BufferManager manager;
- private final boolean applyFragmentLimit;
private DrillFileSystem fs;
private final ExecutorService executor;
@@ -60,21 +59,23 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
*/
private ListeningExecutorService delegatePool;
- public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
- this.applyFragmentLimit=applyFragmentLimit;
- this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
+ this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+ popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
this.popConfig = popConfig;
this.manager = new BufferManagerImpl(allocator);
- OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
+ OpProfileDef def =
+ new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
stats = context.getStats().newOperatorStats(def, allocator);
executionControls = context.getExecutionControls();
executor = context.getDrillbitContext().getExecutor();
}
- public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
- this.applyFragmentLimit=applyFragmentLimit;
- this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
+ throws OutOfMemoryException {
+ this.allocator = context.getNewChildAllocator(popConfig.getClass().getSimpleName(),
+ popConfig.getOperatorId(), popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
this.popConfig = popConfig;
this.manager = new BufferManagerImpl(allocator);
this.stats = stats;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index e4f50a1..51a581a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -30,13 +30,14 @@ import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
@@ -52,8 +53,6 @@ import org.apache.drill.exec.util.Utilities;
import com.google.common.collect.Lists;
-// TODO except for a couple of tests, this is only created by Foreman
-// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
public class QueryContext implements AutoCloseable, OptimizerRulesContext {
@@ -80,7 +79,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
*/
private boolean closed = false;
- public QueryContext(final UserSession session, final DrillbitContext drillbitContext) {
+ public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) {
this.drillbitContext = drillbitContext;
this.session = session;
queryOptions = new QueryOptionManager(session.getOptions());
@@ -92,9 +91,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext {
queryContextInfo = Utilities.createQueryContextInfo(session.getDefaultSchemaName());
contextInformation = new ContextInformation(session.getCredentials(), queryContextInfo);
- allocator = drillbitContext.getAllocator().getChildAllocator(null, plannerSettings.getInitialPlanningMemorySize(),
- plannerSettings.getPlanningMemoryLimit(), false);
- // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
+ allocator = drillbitContext.getAllocator().newChildAllocator(
+ "query:" + QueryIdHelper.getQueryId(queryId),
+ PlannerSettings.getInitialPlanningMemorySize(),
+ plannerSettings.getPlanningMemoryLimit());
bufferManager = new BufferManagerImpl(this.allocator);
viewExpansionContext = new ViewExpansionContext(this);
schemaTreesToClose = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index ee6475c..f720f8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -42,7 +42,7 @@ public abstract class BaseRootExec implements RootExec {
private List<CloseableRecordBatch> operators;
public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
- this.oContext = fragmentContext.newOperatorContext(config, stats, true);
+ this.oContext = fragmentContext.newOperatorContext(config, stats);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorContext.getChildCount(config)),
oContext.getAllocator());
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 6c763d6..73bebd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -134,7 +134,7 @@ public class ScanBatch implements CloseableRecordBatch {
Iterator<RecordReader> readers)
throws ExecutionSetupException {
this(subScanConfig, context,
- context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
+ context.newOperatorContext(subScanConfig),
readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 73f3f95..23e97d0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -21,7 +21,6 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
@@ -66,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
- super(context, context.newOperatorContext(config, null, false), config);
+ super(context, context.newOperatorContext(config, null), config);
this.incoming = batch;
assert incoming != null;
handle = context.getHandle();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index c731e38..c88c72d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.physical.impl.broadcastsender;
import java.util.List;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
@@ -61,7 +60,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
public BroadcastSenderRootExec(FragmentContext context,
RecordBatch incoming,
BroadcastSender config) throws OutOfMemoryException {
- super(context, context.newOperatorContext(config, null, false), config);
+ super(context, context.newOperatorContext(config, null), config);
this.ok = true;
this.incoming = incoming;
this.config = config;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 1cc598a..8db24af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -76,7 +77,6 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.calcite.rel.RelFieldCollation.Direction;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -126,8 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public MergingRecordBatch(final FragmentContext context,
final MergingReceiverPOP config,
final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
- super(config, context, true, context.newOperatorContext(config, false));
- //super(config, context);
+ super(config, context, true, context.newOperatorContext(config));
this.fragProviders = fragProviders;
this.context = context;
this.outgoingContainer = new VectorContainer(oContext);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 5e9e84c..cfe1b80 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
@@ -100,7 +99,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
- super(context, context.newOperatorContext(operator, null, false), operator);
+ super(context, context.newOperatorContext(operator, null), operator);
this.incoming = incoming;
this.operator = operator;
this.context = context;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 9f452cb..f2302ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -21,18 +21,9 @@ import io.netty.buffer.DrillBuf;
import java.util.ArrayList;
import java.util.List;
-import java.util.Set;
-import com.google.common.collect.Sets;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.AutoCloseablePointer;
-import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.common.types.TypeProtos.DataMode;
-import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.AllocationReservation;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -48,7 +39,6 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
-import org.apache.drill.exec.vector.complex.UnionVector;
public class SortRecordBatchBuilder implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortRecordBatchBuilder.class);
@@ -56,12 +46,14 @@ public class SortRecordBatchBuilder implements AutoCloseable {
private final ArrayListMultimap<BatchSchema, RecordBatchData> batches = ArrayListMultimap.create();
private int recordCount;
- private final AutoCloseablePointer<SelectionVector4> pSV4 = new AutoCloseablePointer<>();
- private final AllocationReservation allocationReservation;
private long runningBatches;
+ private SelectionVector4 sv4;
+ private BufferAllocator allocator;
+ final AllocationReservation reservation;
- public SortRecordBatchBuilder(final BufferAllocator allocator) {
- allocationReservation = allocator.newReservation();
+ public SortRecordBatchBuilder(BufferAllocator a) {
+ this.allocator = a;
+ this.reservation = a.newReservation();
}
private long getSize(VectorAccessible batch) {
@@ -94,7 +86,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
if (runningBatches >= Character.MAX_VALUE) {
return false; // allowed in batch.
}
- if (!allocationReservation.add(batch.getRecordCount() * 4)) {
+ if (!reservation.add(batch.getRecordCount() * 4)) {
return false; // sv allocation available.
}
@@ -117,7 +109,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
logger.error(errMsg);
throw new DrillRuntimeException(errMsg);
}
- if (!allocationReservation.add(rbd.getRecordCount() * 4)) {
+ if (!reservation.add(rbd.getRecordCount() * 4)) {
final String errMsg = String.format("Failed to pre-allocate memory for SV. " + "Existing recordCount*4 = %d, " +
"incoming batch recordCount*4 = %d", recordCount * 4, rbd.getRecordCount() * 4);
logger.error(errMsg);
@@ -160,14 +152,13 @@ public class SortRecordBatchBuilder implements AutoCloseable {
assert false : "Invalid to have an empty set of batches with no schemas.";
}
- final DrillBuf svBuffer = allocationReservation.buffer();
+ final DrillBuf svBuffer = reservation.allocateBuffer();
if (svBuffer == null) {
throw new OutOfMemoryError("Failed to allocate direct memory for SV4 vector in SortRecordBatchBuilder.");
}
- pSV4.assignNoChecked(new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE));
+ sv4 = new SelectionVector4(svBuffer, recordCount, Character.MAX_VALUE);
BatchSchema schema = batches.keySet().iterator().next();
List<RecordBatchData> data = batches.get(schema);
- final SelectionVector4 sv4 = pSV4.get();
// now we're going to generate the sv4 pointers
switch (schema.getSelectionVectorMode()) {
@@ -216,16 +207,21 @@ public class SortRecordBatchBuilder implements AutoCloseable {
}
public SelectionVector4 getSv4() {
- return pSV4.get();
+ return sv4;
}
public void clear() {
- DrillAutoCloseables.closeNoChecked(allocationReservation);
for (RecordBatchData d : batches.values()) {
d.container.clear();
}
- batches.clear();
- pSV4.assignNoChecked(null);
+ if (sv4 != null) {
+ sv4.clear();
+ }
+ }
+
+ @Override
+ public void close() {
+ reservation.close();
}
public List<VectorContainer> getHeldRecordBatches() {
@@ -252,11 +248,4 @@ public class SortRecordBatchBuilder implements AutoCloseable {
// We need 4 bytes (SV4) for each record.
return recordCount * 4;
}
-
- @Override
- public void close() {
- clear();
- allocationReservation.close();
- DrillAutoCloseables.closeNoChecked(pSV4);
- }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 78e83d6..ff83cc9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.VectorAccessibleSerializable;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -57,6 +58,8 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
private SelectionVector2 sv = null;
+ private final BufferAllocator localAllocator;
+
/* Tag associated with each trace operator */
final String traceTag;
@@ -70,7 +73,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
super(pop, context, incoming);
this.traceTag = pop.traceTag;
logLocation = context.getConfig().getString(ExecConstants.TRACE_DUMP_DIRECTORY);
-
+ localAllocator = context.getNewChildAllocator("trace", 200, 0, Long.MAX_VALUE);
String fileName = getFileName();
/* Create the log file we will dump to and initialize the file descriptors */
@@ -116,7 +119,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
} catch (IOException e) {
throw new RuntimeException(e);
}
- batch.reconstructContainer(container);
+ batch.reconstructContainer(localAllocator, container);
if (incomingHasSv2) {
sv = wrap.getSv2();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 0d47a57..55b9afe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -37,7 +37,6 @@ import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -84,14 +83,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
}
@Override
- public void kill(boolean sendUpstream) {
- if(current != null) {
- current.kill(sendUpstream);
- current = null;
- }
- }
-
- @Override
protected void killIncoming(boolean sendUpstream) {
unionAllInput.getLeftRecordBatch().kill(sendUpstream);
unionAllInput.getRightRecordBatch().kill(sendUpstream);
@@ -278,10 +269,8 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
// This method is used by inner class to clear the current record batch
private void clearCurrentRecordBatch() {
- if (current != null) {
- for(final VectorWrapper<?> v: current) {
- v.clear();
- }
+ for(VectorWrapper<?> v: current) {
+ v.clear();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 00cf295..be7d4ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -79,7 +79,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
this.context = context;
// In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
// we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
- oContext = context.newOperatorContext(config, false);
+ oContext = context.newOperatorContext(config);
this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
this.stats = oContext.getStats();
@@ -159,7 +159,8 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
batch = getNextBatch();
// skip over empty batches. we do this since these are basically control messages.
- while (batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0 && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
+ while (batch != null && batch.getHeader().getDef().getRecordCount() == 0
+ && (!first || batch.getHeader().getDef().getFieldCount() == 0)) {
batch = getNextBatch();
}
} finally {
@@ -176,7 +177,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
return IterOutcome.NONE;
}
- if (batch.getHeader().getIsOutOfMemory()) {
+ if (context.isOverMemoryLimit()) {
return IterOutcome.OUT_OF_MEMORY;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 1ac82bf..4dbd92d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -157,8 +157,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SPILL_BATCH_GROUP_SIZE = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE);
SPILL_THRESHOLD = config.getInt(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD);
dirs = Iterators.cycle(config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS));
- copierAllocator = oContext.getAllocator().getChildAllocator(
- context.asLimitConsumer(), PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION, true);
+ copierAllocator = oContext.getAllocator().newChildAllocator(oContext.getAllocator().getName() + ":copier",
+ PriorityQueueCopier.INITIAL_ALLOCATION, PriorityQueueCopier.MAX_ALLOCATION);
FragmentHandle handle = context.getHandle();
fileName = String.format("%s/major_fragment_%s/minor_fragment_%s/operator_%s", QueryIdHelper.getQueryId(handle.getQueryId()),
handle.getMajorFragmentId(), handle.getMinorFragmentId(), popConfig.getOperatorId());
@@ -211,18 +211,26 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (sv4 != null) {
sv4.clear();
}
- if (copier != null) {
- copier.close();
- }
- if (copierAllocator != null) {
+
+ try {
+ if (copier != null) {
+ copier.close();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } finally {
copierAllocator.close();
- }
- super.close();
+ super.close();
+
+ if (mSorter != null) {
+ mSorter.clear();
+ }
- if(mSorter != null) {
- mSorter.clear();
}
+
}
+
+
}
@Override
@@ -339,9 +347,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SelectionVector2 sv2;
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
sv2 = incoming.getSelectionVector2().clone();
- if (sv2.getBuffer(false).isRootBuffer()) {
- oContext.getAllocator().takeOwnership(sv2.getBuffer(false));
- }
} else {
try {
sv2 = newSV2();
@@ -473,10 +478,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
batchGroups.addAll(spilledBatchGroups);
spilledBatchGroups = null; // no need to cleanup spilledBatchGroups, all it's batches are in batchGroups now
- // copierAllocator is no longer needed now. Closing it will free memory for this operator
- copierAllocator.close();
- copierAllocator = null;
-
logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
VectorContainer hyperBatch = constructHyperBatch(batchGroups);
createCopier(hyperBatch, batchGroups, container, false);
@@ -587,7 +588,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} finally {
hyperBatch.clear();
}
- takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
long bufSize = getBufferSize(c1);
totalSizeInMemory += bufSize;
logger.debug("mergeAndSpill: final total size in memory = {}", totalSizeInMemory);
@@ -595,25 +595,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return newGroup;
}
- private void takeOwnership(VectorAccessible batch) {
- for (VectorWrapper<?> w : batch) {
- DrillBuf[] bufs = w.getValueVector().getBuffers(false);
- for (DrillBuf buf : bufs) {
- if (buf.isRootBuffer()) {
- oContext.getAllocator().takeOwnership(buf);
- }
- }
- }
- }
-
private long getBufferSize(VectorAccessible batch) {
long size = 0;
for (VectorWrapper<?> w : batch) {
DrillBuf[] bufs = w.getValueVector().getBuffers(false);
for (DrillBuf buf : bufs) {
- if (buf.isRootBuffer()) {
- size += buf.capacity();
- }
+ size += buf.getPossibleMemoryConsumed();
}
}
return size;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index e0d9c2d..ec590c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
+import java.io.IOException;
import java.util.List;
import org.apache.drill.exec.compile.TemplateClassDefinition;
@@ -38,5 +39,5 @@ public interface PriorityQueueCopier extends AutoCloseable {
new TemplateClassDefinition<>(PriorityQueueCopier.class, PriorityQueueCopierTemplate.class);
@Override
- abstract public void close(); // specify this to leave out the Exception
+ abstract public void close() throws IOException; // specify this to leave out the Exception
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index 891907a..5090b33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort;
import io.netty.buffer.DrillBuf;
+import java.io.IOException;
import java.util.List;
import javax.inject.Named;
@@ -66,7 +67,6 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
allocateVectors(targetRecordCount);
for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
if (queueSize == 0) {
- close();
return 0;
}
int compoundIndex = vector4.get(0);
@@ -96,7 +96,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
}
@Override
- public void close() {
+ public void close() throws IOException {
vector4.clear();
for (final VectorWrapper<?> w: outgoing) {
w.getValueVector().clear();
@@ -104,6 +104,10 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
for (final VectorWrapper<?> w : hyperBatch) {
w.clear();
}
+
+ for (BatchGroup batchGroup : batchGroups) {
+ batchGroup.close();
+ }
}
private void siftUp() {
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 229fe65..998665c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -46,11 +46,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected BatchState state;
protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
- this(popConfig, context, true, context.newOperatorContext(popConfig, true));
+ this(popConfig, context, true, context.newOperatorContext(popConfig));
}
protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
- this(popConfig, context, buildSchema, context.newOperatorContext(popConfig, true));
+ this(popConfig, context, buildSchema, context.newOperatorContext(popConfig));
}
protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index ed86358..13164b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -32,13 +32,12 @@ import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* Holds record batch loaded from record batch message.
@@ -120,7 +119,6 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp
newVectors.add(vector);
}
- Preconditions.checkArgument(buf == null || bufOffset == buf.capacity());
// rebuild the schema.
final SchemaBuilder builder = BatchSchema.newBuilder();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 488ba69..d39ce5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -21,6 +21,7 @@ import io.netty.buffer.DrillBuf;
import java.util.List;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -58,7 +59,7 @@ public class WritableBatch implements AutoCloseable {
return buffers;
}
- public void reconstructContainer(VectorContainer container) {
+ public void reconstructContainer(BufferAllocator allocator, VectorContainer container) {
Preconditions.checkState(!cleared,
"Attempted to reconstruct a container from a WritableBatch after it had been cleared");
if (buffers.length > 0) { /* If we have DrillBuf's associated with value vectors */
@@ -67,7 +68,7 @@ public class WritableBatch implements AutoCloseable {
len += b.capacity();
}
- DrillBuf newBuf = buffers[0].getAllocator().buffer(len);
+ DrillBuf newBuf = allocator.buffer(len);
try {
/* Copy data from each buffer into the compound buffer */
int offset = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index 915509f..cc238f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -52,7 +52,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
public ControlClient(DrillbitEndpoint remoteEndpoint, DrillbitEndpoint localEndpoint, ControlMessageHandler handler,
BootStrapContext context, ControlConnectionManager.CloseHandlerCreator closeHandlerFactory) {
super(ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- context.getAllocator().getUnderlyingAllocator(),
+ context.getAllocator().getAsByteBufAllocator(),
context.getBitLoopGroup(),
RpcType.HANDSHAKE,
BitControlHandshake.class,
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 81c886a..a786469 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -46,7 +46,7 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
public ControlServer(ControlMessageHandler handler, BootStrapContext context, ConnectionManagerRegistry connectionRegistry) {
super(
ControlRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- context.getAllocator().getUnderlyingAllocator(),
+ context.getAllocator().getAsByteBufAllocator(),
context.getBitLoopGroup());
this.handler = handler;
this.connectionRegistry = connectionRegistry;
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index d7611b9..9db551b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -48,7 +48,7 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
super(
DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- context.getAllocator().getUnderlyingAllocator(),
+ context.getAllocator().getAsByteBufAllocator(),
context.getBitClientLoopGroup(),
RpcType.HANDSHAKE,
BitServerHandshake.class,
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 5874b31..b2a51ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -21,9 +21,11 @@ import java.io.Closeable;
import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.exception.DrillbitStartupException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
@@ -37,20 +39,23 @@ public class DataConnectionCreator implements Closeable {
private volatile DataServer server;
private final BootStrapContext context;
private final WorkEventBus workBus;
- private final DataResponseHandler dataHandler;
+ private final WorkerBee bee;
private final boolean allowPortHunting;
private ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager = Maps.newConcurrentMap();
+ private final BufferAllocator dataAllocator;
- public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) {
+ public DataConnectionCreator(BootStrapContext context, WorkEventBus workBus, WorkerBee bee, boolean allowPortHunting) {
super();
this.context = context;
this.workBus = workBus;
- this.dataHandler = dataHandler;
+ this.bee = bee;
this.allowPortHunting = allowPortHunting;
+ this.dataAllocator = context.getAllocator()
+ .newChildAllocator("rpc-data", 0, Long.MAX_VALUE);
}
public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
- server = new DataServer(context, workBus, dataHandler);
+ server = new DataServer(context, dataAllocator, workBus, bee);
int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
return completeEndpoint;
@@ -68,6 +73,7 @@ public class DataConnectionCreator implements Closeable {
@Override
public void close() {
Closeables.closeQuietly(server);
+ dataAllocator.close();
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
deleted file mode 100644
index 39dcc90..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.data;
-
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-
-public interface DataResponseHandler {
-
- public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch,
- DrillBuf data, AckSender sender) throws FragmentSetupException, IOException;
-
- public void informOutOfMemory();
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
deleted file mode 100644
index e18b94c..0000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.rpc.data;
-
-import io.netty.buffer.DrillBuf;
-
-import java.io.IOException;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.fragment.FragmentManager;
-
-public class DataResponseHandlerImpl implements DataResponseHandler{
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
- private final WorkerBee bee;
-
- public DataResponseHandlerImpl(final WorkerBee bee) {
- this.bee = bee;
- }
-
- @Override
- public void informOutOfMemory() {
- logger.error("Out of memory outside any particular fragment.");
- }
-
- @Override
- public void handle(final FragmentManager manager, final FragmentRecordBatch fragmentBatch,
- final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException {
-// logger.debug("Fragment Batch received {}", fragmentBatch);
-
- final boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
- if (canRun) {
-// logger.debug("Arriving batch means local batch can run, starting local batch.");
- /*
- * If we've reached the canRun threshold, we'll proceed. This expects handler.handle() to
- * only return a single true.
- */
- bee.startFragmentPendingRemote(manager);
- }
- }
-}