You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:35 UTC
[12/13] drill git commit: DRILL-4134: Allocator Improvements
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 56e2ff2..03118d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -24,9 +24,9 @@ import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.memory.AllocatorClosedException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
@@ -44,7 +44,7 @@ import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.FragmentManager;
import com.google.protobuf.MessageLite;
@@ -55,16 +55,17 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
private volatile ProxyCloseHandler proxyCloseHandler;
private final BootStrapContext context;
private final WorkEventBus workBus;
- private final DataResponseHandler dataHandler;
+ private final WorkerBee bee;
- public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
+ public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus,
+ WorkerBee bee) {
super(
DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- context.getAllocator().getUnderlyingAllocator(),
+ alloc.getAsByteBufAllocator(),
context.getBitLoopGroup());
this.context = context;
this.workBus = workBus;
- this.dataHandler = dataHandler;
+ this.bee = bee;
}
@Override
@@ -106,9 +107,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
};
}
- private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
-
-
private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) {
return FragmentHandle.newBuilder()
.setQueryId(batch.getQueryId())
@@ -117,34 +115,50 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
.build();
}
+ private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException,
+ IOException {
+ for (int minor = minorStart; minor < minorStopExclusive; minor++) {
+ final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor));
+ if (manager == null) {
+ // A missing manager means the query already terminated. We can simply drop this data.
+ continue;
+ }
+
+ final boolean canRun = manager.handle(batch);
+ if (canRun) {
+ // logger.debug("Arriving batch means local batch can run, starting local batch.");
+ /*
+ * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single
+ * true. This is guaranteed by the interface.
+ */
+ bee.startFragmentPendingRemote(manager);
+ }
+ }
+
+ }
@Override
protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body, ResponseSender sender) throws RpcException {
assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE;
final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
- final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
+ final AckSender ack = new AckSender(sender);
+
- AckSender ack = new AckSender(sender);
// increment so we don't get false returns.
ack.increment();
+
try {
- if(body == null){
+ final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) body, ack);
+ final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
- for(int minor = 0; minor < targetCount; minor++){
- FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
- if(manager != null){
- ack.increment();
- dataHandler.handle(manager, fragmentBatch, null, ack);
- }
- }
+ // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst
+ // multiple fragments.
+ final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount);
+ submit(batch, firstOwner, targetCount);
+ submit(batch, 0, firstOwner);
- }else{
- for (int minor = 0; minor < targetCount; minor++) {
- send(fragmentBatch, (DrillBuf) body, minor, ack);
- }
- }
} catch (IOException | FragmentSetupException e) {
logger.error("Failure while getting fragment manager. {}",
QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
@@ -159,51 +173,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
}
- private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int minor, final AckSender ack)
- throws FragmentSetupException, IOException {
-
- final FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
- if (manager == null) {
- return;
- }
-
- final BufferAllocator allocator = manager.getFragmentContext().getAllocator();
- final Pointer<DrillBuf> out = new Pointer<>();
-
- final boolean withinMemoryEnvelope;
-
- try {
- withinMemoryEnvelope = allocator.shareOwnership((DrillBuf) body, out);
- } catch(final AllocatorClosedException e) {
- /*
- * It can happen that between the time we get the fragment manager and we
- * try to transfer this buffer to it, the fragment may have been cancelled
- * and closed. When that happens, the allocator will be closed when we
- * attempt this. That just means we can drop this data on the floor, since
- * the receiver no longer exists (and no longer needs it).
- *
- * Note that checking manager.isCancelled() before we attempt this isn't enough,
- * because of timing: it may still be cancelled between that check and
- * the attempt to do the memory transfer. To double check ourselves, we
- * do check manager.isCancelled() here, after the fact; it shouldn't
- * change again after its allocator has been closed.
- */
- assert manager.isCancelled();
- return;
- }
-
- if (!withinMemoryEnvelope) {
- // if we over reserved, we need to add poison pill before batch.
- dataHandler.handle(manager, OOM_FRAGMENT, null, null);
- }
-
- ack.increment();
- dataHandler.handle(manager, fragmentBatch, out.value, ack);
-
- // make sure to release the reference count we have to the new buffer.
- // dataHandler.handle should have taken any ownership it needed.
- out.value.release();
- }
private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
@@ -226,7 +195,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
return new OutOfMemoryHandler() {
@Override
public void handle() {
- dataHandler.informOutOfMemory();
+ logger.error("Out of memory in RPC layer.");
}
};
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java
new file mode 100644
index 0000000..a9bc305
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
+import org.apache.drill.exec.record.RawFragmentBatch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An incoming batch of data. The data is held by the original allocator. Any use of the associated data must be
+ * leveraged through the use of newRawFragmentBatch().
+ */
+public class IncomingDataBatch {
+
+ private final FragmentRecordBatch header;
+ private final DrillBuf body;
+ private final AckSender sender;
+
+ /**
+ * Create a new batch. Does not impact reference counts of body.
+ *
+ * @param header
+ * Batch header
+ * @param body
+ * Data body. Could be null.
+ * @param sender
+ * AckSender to use for underlying RawFragmentBatches.
+ */
+ public IncomingDataBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
+ Preconditions.checkNotNull(header);
+ Preconditions.checkNotNull(sender);
+ this.header = header;
+ this.body = body;
+ this.sender = sender;
+ }
+
+ /**
+ * Create a new RawFragmentBatch based on this incoming data batch that is transferred into the provided allocator.
+ * Also increments the AckSender to expect one additional return message.
+ *
+ * @param allocator
+ * Target allocator that should be associated with data underlying this batch.
+ * @return The newly created RawFragmentBatch
+ */
+ public RawFragmentBatch newRawFragmentBatch(final BufferAllocator allocator) {
+ final DrillBuf transferredBuffer = body == null ? null : body.transferOwnership(allocator).buffer;
+ sender.increment();
+ return new RawFragmentBatch(header, transferredBuffer, sender);
+ }
+
+ public FragmentRecordBatch getHeader() {
+ return header;
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index ddba213..049ae0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -70,7 +70,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public UserServer(DrillConfig config, ScanResult classpathScan, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
UserWorker worker, Executor executor) throws DrillbitStartupException {
- super(UserRpcConfig.getMapping(config, executor), alloc.getUnderlyingAllocator(), eventLoopGroup);
+ super(UserRpcConfig.getMapping(config, executor),
+ alloc.getAsByteBufAllocator(),
+ eventLoopGroup);
this.worker = worker;
this.alloc = alloc;
// TODO: move this up
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index feebbb2..99523d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -188,7 +188,7 @@ public class Drillbit implements AutoCloseable {
context = new BootStrapContext(config, classpathScan);
manager = new WorkManager(context);
engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context,
- manager.getWorkBus(), manager.getDataHandler(), allowPortHunting);
+ manager.getWorkBus(), manager.getBee(), allowPortHunting);
webServer = new WebServer(config, context.getMetrics(), manager);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d3b4128..e07ca90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -38,9 +38,9 @@ import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.ControllerImpl;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
-import org.apache.drill.exec.rpc.data.DataResponseHandler;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.user.UserWorker;
@@ -58,7 +58,7 @@ public class ServiceEngine implements Closeable{
private final boolean allowPortHunting;
public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context,
- WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) throws DrillbitStartupException {
+ WorkEventBus workBus, WorkerBee bee, boolean allowPortHunting) throws DrillbitStartupException {
final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup(
context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
this.userServer = new UserServer(
@@ -69,7 +69,7 @@ public class ServiceEngine implements Closeable{
userWorker,
context.getExecutor());
this.controller = new ControllerImpl(context, controlMessageHandler, allowPortHunting);
- this.dataPool = new DataConnectionCreator(context, workBus, dataHandler, allowPortHunting);
+ this.dataPool = new DataConnectionCreator(context, workBus, bee, allowPortHunting);
this.config = context.getConfig();
this.allowPortHunting = allowPortHunting;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 12e00f2..ec3cae8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -38,8 +38,8 @@ import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractRecordReader;
@@ -53,12 +53,11 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.Path;
-
public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
@@ -161,10 +160,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
int numParts = 0;
- OperatorContext oContext = context.newOperatorContext(scan, false /*
- * ScanBatch is not subject to fragment memory
- * limit
- */);
+ OperatorContext oContext = context.newOperatorContext(scan);
final DrillFileSystem dfs;
try {
dfs = oContext.newFileSystem(fsConf);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 710edd6..8f4f5fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -109,7 +109,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
super();
- this.oContext = context.newOperatorContext(writer, true);
+ this.oContext = context.newOperatorContext(writer);
this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(),
new ParquetDirectByteBufferAllocator(oContext.getAllocator()), pageSize);
this.partitionColumns = writer.getPartitionColumns();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 0cb12f8..afa4fa0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -43,7 +43,6 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
@@ -70,10 +69,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<SchemaPath> columns = rowGroupScan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
- OperatorContext oContext = context.newOperatorContext(rowGroupScan, false /*
- * ScanBatch is not subject to fragment
- * memory limit
- */);
+ OperatorContext oContext = context.newOperatorContext(rowGroupScan);
List<String[]> partitionColumns = Lists.newArrayList();
List<Integer> selectedPartitionColumns = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
index e02b413..e624ada 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
@@ -24,7 +24,7 @@ import java.lang.management.MemoryUsage;
import java.util.Iterator;
import java.util.List;
-import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -63,7 +63,7 @@ public class MemoryIterator implements Iterator<Object> {
memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory();
- memoryInfo.direct_max = TopLevelAllocator.MAXIMUM_DIRECT_MEMORY;
+ memoryInfo.direct_max = DrillConfig.getMaxDirectMemory();
return memoryInfo;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 93fd13e..5fd6f1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -37,8 +37,6 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
-import org.apache.drill.exec.rpc.data.DataResponseHandler;
-import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -75,7 +73,6 @@ public class WorkManager implements AutoCloseable {
private DrillbitContext dContext;
private final ControlMessageHandler controlMessageWorker;
- private final DataResponseHandler dataHandler;
private final UserWorker userWorker;
private final WorkerBee bee;
private final WorkEventBus workBus;
@@ -97,7 +94,6 @@ public class WorkManager implements AutoCloseable {
controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
statusThread = new StatusThread();
- dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()
}
public void start(
@@ -132,10 +128,6 @@ public class WorkManager implements AutoCloseable {
return workBus;
}
- public DataResponseHandler getDataHandler() {
- return dataHandler;
- }
-
public ControlMessageHandler getControlMessageHandler() {
return controlMessageWorker;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 1579c2b..8a3e5b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -93,26 +92,18 @@ public abstract class AbstractDataCollector implements DataCollector{
@Override
public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException {
- // if we received an out of memory, add an item to all the buffer queues.
- if (batch.getHeader().getIsOutOfMemory()) {
- for (RawBatchBuffer buffer : buffers) {
- buffer.enqueue(batch);
- }
- }
-
// check to see if we have enough fragments reporting to proceed.
- boolean decremented = false;
+ boolean decrementedToZero = false;
if (remainders.compareAndSet(fragmentMap.get(minorFragmentId), 0, 1)) {
int rem = remainingRequired.decrementAndGet();
if (rem == 0) {
- parentAccounter.decrementAndGet();
- decremented = true;
+ decrementedToZero = 0 == parentAccounter.decrementAndGet();
}
}
getBuffer(minorFragmentId).enqueue(batch);
- return decremented;
+ return decrementedToZero;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 31f2e4a..f15a3e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -88,28 +88,10 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
throw new IOException("Attempted to enqueue batch after finished");
}
}
- if (batch.getHeader().getIsOutOfMemory()) {
- handleOutOfMemory(batch);
- return;
- }
enqueueInner(batch);
}
/**
- * handle the out of memory case
- *
- * @param batch
- */
- protected void handleOutOfMemory(final RawFragmentBatch batch) {
- if (!bufferQueue.checkForOutOfMemory()) {
- logger.debug("Adding OOM message to front of queue. Current queue size: {}", bufferQueue.size());
- bufferQueue.addOomBatch(batch);
- } else {
- logger.debug("ignoring duplicate OOM message");
- }
- }
-
- /**
* implementation specific method to enqueue batch
*
* @param batch
@@ -202,11 +184,11 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
return null;
}
+ if (context.isOverMemoryLimit()) {
+ outOfMemory.set(true);
+ }
+
if (b != null) {
- if (b.getHeader().getIsOutOfMemory()) {
- outOfMemory.set(true);
- return b;
- }
upkeep(b);
@@ -234,7 +216,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
}
private void assertAckSent(RawFragmentBatch batch) {
- assert batch == null || batch.isAckSent() || batch.getHeader().getIsOutOfMemory() : "Ack not sent for batch";
+ assert batch == null || batch.isAckSent() : "Ack not sent for batch";
}
private int decrementStreamCounter() {
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index b21c61d..a516fad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -21,13 +21,17 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -38,11 +42,22 @@ import com.google.common.collect.Maps;
public class IncomingBuffers implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
+ private volatile boolean closed = false;
private final AtomicInteger streamsRemaining = new AtomicInteger(0);
private final AtomicInteger remainingRequired;
- private final Map<Integer, DataCollector> fragCounts;
+ private final Map<Integer, DataCollector> collectorMap;
private final FragmentContext context;
+ /**
+ * Lock used to manage close and data acceptance. We should only create a local reference to incoming data in the case
+ * that the incoming buffers are !closed. As such, we need to make sure that we aren't in the process of closing the
+ * incoming buffers when data is arriving. The read lock can be shared by many incoming batches but the write lock
+ * must be exclusive to the close method.
+ */
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final AutoCloseableLock sharedIncomingBatchLock = new AutoCloseableLock(lock.readLock());
+ private final AutoCloseableLock exclusiveCloseLock = new AutoCloseableLock(lock.writeLock());
+
public IncomingBuffers(PlanFragment fragment, FragmentContext context) {
this.context = context;
Map<Integer, DataCollector> collectors = Maps.newHashMap();
@@ -56,39 +71,50 @@ public class IncomingBuffers implements AutoCloseable {
}
logger.debug("Came up with a list of {} required fragments. Fragments {}", remainingRequired.get(), collectors);
- fragCounts = ImmutableMap.copyOf(collectors);
+ collectorMap = ImmutableMap.copyOf(collectors);
// Determine the total number of incoming streams that will need to be completed before we are finished.
int totalStreams = 0;
- for (DataCollector bc : fragCounts.values()) {
+ for (DataCollector bc : collectorMap.values()) {
totalStreams += bc.getTotalIncomingFragments();
}
assert totalStreams >= remainingRequired.get() : String.format("Total Streams %d should be more than the minimum number of streams to commence (%d). It isn't.", totalStreams, remainingRequired.get());
streamsRemaining.set(totalStreams);
}
- public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException, IOException {
- // no need to do anything if we've already enabled running.
- // logger.debug("New Batch Arrived {}", batch);
- if (batch.getHeader().getIsOutOfMemory()) {
- for (DataCollector fSet : fragCounts.values()) {
- fSet.batchArrived(0, batch);
+ public boolean batchArrived(final IncomingDataBatch incomingBatch) throws FragmentSetupException, IOException {
+
+ // we want to make sure that we only generate local record batch reference in the case that we're not closed.
+ // Otherwise we would leak memory.
+ try (AutoCloseableLock lock = sharedIncomingBatchLock.open()) {
+ if (closed) {
+ return false;
}
- return false;
- }
- if (batch.getHeader().getIsLastBatch()) {
- streamsRemaining.decrementAndGet();
- }
- int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
- DataCollector fSet = fragCounts.get(sendMajorFragmentId);
- if (fSet == null) {
- throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting. The id was %d. %s", sendMajorFragmentId, Arrays.toString(fragCounts.values().toArray())));
- }
- synchronized (this) {
- boolean decremented = fSet.batchArrived(batch.getHeader().getSendingMinorFragmentId(), batch);
- // we should only return true if remaining required has been decremented and is currently equal to zero.
- return decremented && remainingRequired.get() == 0;
+
+ if (incomingBatch.getHeader().getIsLastBatch()) {
+ streamsRemaining.decrementAndGet();
+ }
+
+ final int sendMajorFragmentId = incomingBatch.getHeader().getSendingMajorFragmentId();
+ DataCollector collector = collectorMap.get(sendMajorFragmentId);
+ if (collector == null) {
+ throw new FragmentSetupException(String.format(
+ "We received a major fragment id that we were not expecting. The id was %d. %s", sendMajorFragmentId,
+ Arrays.toString(collectorMap.values().toArray())));
+ }
+
+ synchronized (collector) {
+ final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator());
+ boolean decrementedToZero = collector
+ .batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch);
+ newRawFragmentBatch.release();
+
+ // we should only return true if remaining required has been decremented and is currently equal to zero.
+ return decrementedToZero;
+ }
+
}
+
}
public int getRemainingRequired() {
@@ -100,19 +126,19 @@ public class IncomingBuffers implements AutoCloseable {
}
public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
- return fragCounts.get(senderMajorFragmentId).getBuffers();
+ return collectorMap.get(senderMajorFragmentId).getBuffers();
}
-
-
-
public boolean isDone() {
return streamsRemaining.get() < 1;
}
@Override
public void close() throws Exception {
- AutoCloseables.close(fragCounts.values().toArray(new AutoCloseable[0]));
+ try (AutoCloseableLock lock = exclusiveCloseLock.open()) {
+ closed = true;
+ AutoCloseables.close(collectorMap.values());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 753c75d..9915b7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -29,11 +29,9 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -82,9 +80,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
private Path path;
private FSDataOutputStream outputStream;
- public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) throws IOException, OutOfMemoryException {
+ public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) {
super(context, fragmentCount);
- this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION, true);
+ this.allocator = context.getNewChildAllocator(
+ "SpoolingRawBatchBufer", 100, ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION);
this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
this.oppositeId = oppositeId;
this.bufferIndex = bufferIndex;
@@ -224,11 +223,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
@Override
protected void upkeep(RawFragmentBatch batch) {
- FragmentRecordBatch header = batch.getHeader();
- if (header.getIsOutOfMemory()) {
+ if (context.isOverMemoryLimit()) {
outOfMemory.set(true);
- return;
}
+
DrillBuf body = batch.getBody();
if (body != null) {
currentSizeInMemory -= body.capacity();
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index ef06ea8..b177770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -65,7 +65,7 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch
@Override
public boolean checkForOutOfMemory() {
- return buffer.peekFirst().getHeader().getIsOutOfMemory();
+ return context.isOverMemoryLimit();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 71f9307..7b015a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -44,7 +44,7 @@ import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
@@ -159,7 +159,7 @@ public class Foreman implements Runnable {
this.closeFuture = initiatingClient.getChannel().closeFuture();
closeFuture.addListener(closeListener);
- queryContext = new QueryContext(connection.getSession(), drillbitContext);
+ queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
@@ -407,7 +407,6 @@ public class Foreman implements Runnable {
setupRootFragment(rootPlanFragment, work.getRootOperator());
setupNonRootFragments(planFragments);
- drillbitContext.getAllocator().resetLimits(); // TODO a global effect for this query?!?
moveToState(QueryState.RUNNING, null);
logger.debug("Fragments running.");
@@ -435,7 +434,7 @@ public class Foreman implements Runnable {
final OptionManager optionManager = queryContext.getOptions();
final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
- queryContext.getConfig().getLong(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC));
+ queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
maxAllocPerNode = Math.min(maxAllocPerNode,
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index ff348cb..20315e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -22,8 +22,8 @@ import java.io.IOException;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
/**
* The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources
@@ -37,7 +37,7 @@ public interface FragmentManager {
* @return True if the fragment has enough incoming data to be able to be run.
* @throws FragmentSetupException, IOException
*/
- boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException;
+ boolean handle(IncomingDataBatch batch) throws FragmentSetupException, IOException;
/**
* Get the fragment runner for this incoming fragment. Note, this can only be requested once.
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 9378e51..b9cf8e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -26,8 +26,8 @@ import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.foreman.ForemanException;
@@ -69,7 +69,7 @@ public class NonRootFragmentManager implements FragmentManager {
* @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
*/
@Override
- public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
+ public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
return buffers.batchArrived(batch);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 0713398..0f7b10e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -24,8 +24,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
import org.apache.drill.exec.work.batch.IncomingBuffers;
// TODO a lot of this is the same as NonRootFragmentManager
@@ -46,7 +46,7 @@ public class RootFragmentManager implements FragmentManager {
}
@Override
- public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
+ public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
return buffers.batchArrived(batch);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 46f0526..2077d6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -31,9 +31,6 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.io.Files;
-
-import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.scanner.ClassPathScanner;
@@ -43,9 +40,9 @@ import org.apache.drill.exec.ExecTest;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocator;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.memory.RootAllocator;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
@@ -72,11 +69,6 @@ import com.google.common.base.Preconditions;
import com.google.common.io.Files;
import com.google.common.io.Resources;
-import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
public class BaseTestQuery extends ExecTest {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
@@ -268,9 +260,6 @@ public class BaseTestQuery extends ExecTest {
if (bit != null) {
final DrillbitContext drillbitContext = bit.getContext();
final BufferAllocator bufferAllocator = drillbitContext.getAllocator();
- if (!(bufferAllocator instanceof RootAllocator)) {
- throw new IllegalStateException("The DrillbitContext's allocator is not a RootAllocator");
- }
final RootAllocator rootAllocator = (RootAllocator) bufferAllocator;
rootAllocator.verify();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java b/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
deleted file mode 100644
index 19613fe..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-import org.apache.drill.exec.testing.Controls;
-import org.apache.drill.exec.testing.ControlsInjectionUtil;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * Run several tpch queries and inject an OutOfMemoryException in ScanBatch that will cause an OUT_OF_MEMORY outcome to
- * be propagated downstream. Make sure the proper "memory error" message is sent to the client.
- */
-@Ignore("Need to add exception site in memory layer that doesn't depend on other Drill code.")
-public class TestAllocationException extends BaseTestQuery {
-
- private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true";
-
- private void testWithException(final String fileName) throws Exception{
- test(SINGLE_MODE);
-
- final String controls = Controls.newBuilder()
- .addException(TopLevelAllocator.class,
- TopLevelAllocator.CHILD_BUFFER_INJECTION_SITE,
- OutOfMemoryException.class,
- 200,
- 1
- ).build();
- ControlsInjectionUtil.setControls(client, controls);
-
- String query = getFile(fileName);
-
- try {
- test(query);
- fail("The query should have failed!");
- } catch(UserException uex) {
- DrillPBError error = uex.getOrCreatePBError(false);
- assertEquals(DrillPBError.ErrorType.RESOURCE, error.getErrorType());
- assertTrue("Error message isn't related to memory error",
- uex.getMessage().contains(UserException.MEMORY_ERROR_MSG));
- }
- }
-
- @Test
- public void testWithOOM() throws Exception{
- testWithException("queries/tpch/01.sql");
- }
-}
http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 2389dc9..322e54a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -18,30 +18,19 @@
package org.apache.drill.exec.memory;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-
+import static org.junit.Assert.fail;
import io.netty.buffer.DrillBuf;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import io.netty.buffer.UnsafeDirectLittleEndian;
-
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
@@ -52,7 +41,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
import org.apache.drill.exec.proto.BitControl;
-import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.server.Drillbit;
@@ -65,15 +53,16 @@ import org.apache.drill.test.DrillTest;
import org.junit.Test;
import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
import com.google.common.io.Files;
public class TestAllocators extends DrillTest {
private static final Properties TEST_CONFIGURATIONS = new Properties() {
{
- put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "14000000");
- put(AccountorImpl.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
- put(AccountorImpl.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
+ put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "14000000");
+ // put(AccountorImpl.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
+ // put(AccountorImpl.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
}
};
@@ -95,8 +84,7 @@ public class TestAllocators extends DrillTest {
final Properties props = new Properties() {
{
- put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
- put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
+ put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "1000000");
}
};
final DrillConfig config = DrillConfig.create(props);
@@ -140,8 +128,7 @@ public class TestAllocators extends DrillTest {
public void testClearBitVector() {
final Properties props = new Properties() {
{
- put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
- put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
+ put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "1000000");
}
};
final DrillConfig config = DrillConfig.create(props);
@@ -163,147 +150,145 @@ public class TestAllocators extends DrillTest {
public void testTransfer() throws Exception {
final Properties props = new Properties() {
{
- put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
- put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
+ put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "1049600");
}
};
final DrillConfig config = DrillConfig.create(props);
BufferAllocator a = RootAllocatorFactory.newRoot(config);
- BufferAllocator b = RootAllocatorFactory.newRoot(config);
+ BufferAllocator a1 = a.newChildAllocator("a1", 0, Integer.MAX_VALUE);
+ BufferAllocator a2 = a.newChildAllocator("a2", 0, Integer.MAX_VALUE);
- DrillBuf buf1 = a.buffer(1_000_000);
- DrillBuf buf2 = b.buffer(1_000);
- b.takeOwnership(buf1);
+ DrillBuf buf1 = a1.buffer(1_000_000);
+ DrillBuf buf2 = a2.buffer(1_000);
+ DrillBuf buf3 = buf1.transferOwnership(a2).buffer;
buf1.release();
buf2.release();
+ buf3.release();
+ a1.close();
+ a2.close();
a.close();
- b.close();
}
@Test
public void testAllocators() throws Exception {
// Setup a drillbit (initializes a root allocator)
final DrillConfig config = DrillConfig.create(TEST_CONFIGURATIONS);
- final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
- final Drillbit bit = new Drillbit(config, serviceSet);
- bit.run();
- final DrillbitContext bitContext = bit.getContext();
- FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
- StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);
-
- // Create a few Fragment Contexts
-
- BitControl.PlanFragment.Builder pfBuilder1=BitControl.PlanFragment.newBuilder();
- pfBuilder1.setMemInitial(1500000);
- BitControl.PlanFragment pf1=pfBuilder1.build();
- BitControl.PlanFragment.Builder pfBuilder2=BitControl.PlanFragment.newBuilder();
- pfBuilder2.setMemInitial(500000);
- BitControl.PlanFragment pf2=pfBuilder1.build();
-
- FragmentContext fragmentContext1 = new FragmentContext(bitContext, pf1, null, functionRegistry);
- FragmentContext fragmentContext2 = new FragmentContext(bitContext, pf2, null, functionRegistry);
-
- // Get a few physical operators. Easiest way is to read a physical plan.
- PhysicalPlanReader planReader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(bitContext, storageRegistry);
- PhysicalPlan plan = planReader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8));
- List<PhysicalOperator> physicalOperators = plan.getSortedOperators();
- Iterator<PhysicalOperator> physicalOperatorIterator = physicalOperators.iterator();
-
- PhysicalOperator physicalOperator1 = physicalOperatorIterator.next();
- PhysicalOperator physicalOperator2 = physicalOperatorIterator.next();
- PhysicalOperator physicalOperator3 = physicalOperatorIterator.next();
- PhysicalOperator physicalOperator4 = physicalOperatorIterator.next();
- PhysicalOperator physicalOperator5 = physicalOperatorIterator.next();
- PhysicalOperator physicalOperator6 = physicalOperatorIterator.next();
-
- // Create some bogus Operator profile defs and stats to create operator contexts
- OpProfileDef def;
- OperatorStats stats;
-
- //Use some bogus operator type to create a new operator context.
- def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
- OperatorContext.getChildCount(physicalOperator1));
- stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
-
-
- // Add a couple of Operator Contexts
- // Initial allocation = 1000000 bytes for all operators
- OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1, true);
- DrillBuf b11=oContext11.getAllocator().buffer(1000000);
-
- OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats, true);
- DrillBuf b12=oContext12.getAllocator().buffer(500000);
-
- OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3, true);
-
- def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
- OperatorContext.getChildCount(physicalOperator4));
- stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
- OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats, true);
- DrillBuf b22=oContext22.getAllocator().buffer(2000000);
-
- // New Fragment begins
- BitControl.PlanFragment.Builder pfBuilder3=BitControl.PlanFragment.newBuilder();
- pfBuilder3.setMemInitial(1000000);
- BitControl.PlanFragment pf3=pfBuilder3.build();
-
- FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry);
-
- // New fragment starts an operator that allocates an amount within the limit
- def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
- OperatorContext.getChildCount(physicalOperator5));
- stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
- OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats, true);
-
- DrillBuf b31a = oContext31.getAllocator().buffer(200000);
-
- //Previously running operator completes
- b22.release();
- ((AutoCloseable) oContext22).close();
-
- // Fragment 3 asks for more and fails
- boolean outOfMem = false;
- try {
- oContext31.getAllocator().buffer(44000000);
- fail("Fragment 3 should fail to allocate buffer");
- } catch (OutOfMemoryRuntimeException e) {
- outOfMem = true; // Expected.
- }
- assertTrue(outOfMem);
-
- // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
- OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false);
- try {
- DrillBuf b32 = oContext32.getAllocator().buffer(4400000);
- b32.release();
- } catch (OutOfMemoryException e) {
- fail("Fragment 3 failed to allocate buffer");
- } finally {
- closeOp(oContext32);
- }
- b11.release();
- closeOp(oContext11);
- b12.release();
- closeOp(oContext12);
- closeOp(oContext21);
- b31a.release();
- closeOp(oContext31);
-
- fragmentContext1.close();
- fragmentContext2.close();
- fragmentContext3.close();
-
- bit.close();
- serviceSet.close();
-
-/*
- // ---------------------------------------- DEBUG ----------------------------------
- assertEquals(0, UnsafeDirectLittleEndian.getBufferCount());
- // ---------------------------------------- DEBUG ----------------------------------
-*/
+ try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ final Drillbit bit = new Drillbit(config, serviceSet)) {
+ ;
+ bit.run();
+ final DrillbitContext bitContext = bit.getContext();
+ FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
+ StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);
+
+ // Create a few Fragment Contexts
+
+ BitControl.PlanFragment.Builder pfBuilder1 = BitControl.PlanFragment.newBuilder();
+ pfBuilder1.setMemInitial(1500000);
+ BitControl.PlanFragment pf1 = pfBuilder1.build();
+ BitControl.PlanFragment.Builder pfBuilder2 = BitControl.PlanFragment.newBuilder();
+ pfBuilder2.setMemInitial(500000);
+ BitControl.PlanFragment pf2 = pfBuilder1.build();
+
+ FragmentContext fragmentContext1 = new FragmentContext(bitContext, pf1, null, functionRegistry);
+ FragmentContext fragmentContext2 = new FragmentContext(bitContext, pf2, null, functionRegistry);
+
+ // Get a few physical operators. Easiest way is to read a physical plan.
+ PhysicalPlanReader planReader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(bitContext,
+ storageRegistry);
+ PhysicalPlan plan = planReader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(planFile),
+ Charsets.UTF_8));
+ List<PhysicalOperator> physicalOperators = plan.getSortedOperators();
+ Iterator<PhysicalOperator> physicalOperatorIterator = physicalOperators.iterator();
+
+ PhysicalOperator physicalOperator1 = physicalOperatorIterator.next();
+ PhysicalOperator physicalOperator2 = physicalOperatorIterator.next();
+ PhysicalOperator physicalOperator3 = physicalOperatorIterator.next();
+ PhysicalOperator physicalOperator4 = physicalOperatorIterator.next();
+ PhysicalOperator physicalOperator5 = physicalOperatorIterator.next();
+ PhysicalOperator physicalOperator6 = physicalOperatorIterator.next();
+
+ // Create some bogus Operator profile defs and stats to create operator contexts
+ OpProfileDef def;
+ OperatorStats stats;
+
+ // Use some bogus operator type to create a new operator context.
+ def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+ OperatorContext.getChildCount(physicalOperator1));
+ stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
+
+ // Add a couple of Operator Contexts
+ // Initial allocation = 1000000 bytes for all operators
+ OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1);
+ DrillBuf b11 = oContext11.getAllocator().buffer(1000000);
+
+ OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats);
+ DrillBuf b12 = oContext12.getAllocator().buffer(500000);
+
+ OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3);
+
+ def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
+ OperatorContext.getChildCount(physicalOperator4));
+ stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
+ OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats);
+ DrillBuf b22 = oContext22.getAllocator().buffer(2000000);
+
+ // New Fragment begins
+ BitControl.PlanFragment.Builder pfBuilder3 = BitControl.PlanFragment.newBuilder();
+ pfBuilder3.setMemInitial(1000000);
+ BitControl.PlanFragment pf3 = pfBuilder3.build();
+
+ FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry);
+
+ // New fragment starts an operator that allocates an amount within the limit
+ def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
+ OperatorContext.getChildCount(physicalOperator5));
+ stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
+ OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats);
+
+ DrillBuf b31a = oContext31.getAllocator().buffer(200000);
+
+ // Previously running operator completes
+ b22.release();
+ ((AutoCloseable) oContext22).close();
+
+ // Fragment 3 asks for more and fails
+ boolean outOfMem = false;
+ try {
+ oContext31.getAllocator().buffer(44000000);
+ fail("Fragment 3 should fail to allocate buffer");
+ } catch (OutOfMemoryException e) {
+ outOfMem = true; // Expected.
+ }
+ assertTrue(outOfMem);
+
+ // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
+ OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6);
+ try {
+ DrillBuf b32 = oContext32.getAllocator().buffer(4400000);
+ b32.release();
+ } catch (OutOfMemoryException e) {
+ fail("Fragment 3 failed to allocate buffer");
+ } finally {
+ closeOp(oContext32);
+ }
+
+ b11.release();
+ closeOp(oContext11);
+ b12.release();
+ closeOp(oContext12);
+ closeOp(oContext21);
+ b31a.release();
+ closeOp(oContext31);
+
+ fragmentContext1.close();
+ fragmentContext2.close();
+ fragmentContext3.close();
+
+ }
}
private void closeOp(OperatorContext c) throws Exception {