You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/12/22 16:06:35 UTC

[12/13] drill git commit: DRILL-4134: Allocator Improvements

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 56e2ff2..03118d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -24,9 +24,9 @@ import io.netty.channel.socket.SocketChannel;
 import io.netty.util.concurrent.GenericFutureListener;
 
 import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.drill.exec.exception.FragmentSetupException;
-import org.apache.drill.exec.memory.AllocatorClosedException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData.BitClientHandshake;
 import org.apache.drill.exec.proto.BitData.BitServerHandshake;
@@ -44,7 +44,7 @@ import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.fragment.FragmentManager;
 
 import com.google.protobuf.MessageLite;
@@ -55,16 +55,17 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
   private volatile ProxyCloseHandler proxyCloseHandler;
   private final BootStrapContext context;
   private final WorkEventBus workBus;
-  private final DataResponseHandler dataHandler;
+  private final WorkerBee bee;
 
-  public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
+  public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus,
+      WorkerBee bee) {
     super(
         DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
-        context.getAllocator().getUnderlyingAllocator(),
+        alloc.getAsByteBufAllocator(),
         context.getBitLoopGroup());
     this.context = context;
     this.workBus = workBus;
-    this.dataHandler = dataHandler;
+    this.bee = bee;
   }
 
   @Override
@@ -106,9 +107,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
     };
   }
 
-  private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
-
-
   private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) {
     return FragmentHandle.newBuilder()
         .setQueryId(batch.getQueryId())
@@ -117,34 +115,50 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
         .build();
   }
 
+  private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException,
+      IOException {
+    for (int minor = minorStart; minor < minorStopExclusive; minor++) {
+      final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor));
+      if (manager == null) {
+        // A missing manager means the query already terminated. We can simply drop this data.
+        continue;
+      }
+
+      final boolean canRun = manager.handle(batch);
+      if (canRun) {
+        // logger.debug("Arriving batch means local batch can run, starting local batch.");
+        /*
+         * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single
+         * true. This is guaranteed by the interface.
+         */
+        bee.startFragmentPendingRemote(manager);
+      }
+    }
+
+  }
 
   @Override
   protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body, ResponseSender sender) throws RpcException {
     assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE;
 
     final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
-    final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
+    final AckSender ack = new AckSender(sender);
+
 
-    AckSender ack = new AckSender(sender);
     // increment so we don't get false returns.
     ack.increment();
+
     try {
 
-      if(body == null){
+      final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) body, ack);
+      final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
 
-        for(int minor = 0; minor < targetCount; minor++){
-          FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
-          if(manager != null){
-            ack.increment();
-            dataHandler.handle(manager, fragmentBatch, null, ack);
-          }
-        }
+      // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst
+      // multiple fragments.
+      final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount);
+      submit(batch, firstOwner, targetCount);
+      submit(batch, 0, firstOwner);
 
-      }else{
-        for (int minor = 0; minor < targetCount; minor++) {
-          send(fragmentBatch, (DrillBuf) body, minor, ack);
-        }
-      }
     } catch (IOException | FragmentSetupException e) {
       logger.error("Failure while getting fragment manager. {}",
           QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
@@ -159,51 +173,6 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
     }
   }
 
-  private void send(final FragmentRecordBatch fragmentBatch, final DrillBuf body, final int minor, final AckSender ack)
-      throws FragmentSetupException, IOException {
-
-    final FragmentManager manager = workBus.getFragmentManager(getHandle(fragmentBatch, minor));
-    if (manager == null) {
-      return;
-    }
-
-    final BufferAllocator allocator = manager.getFragmentContext().getAllocator();
-    final Pointer<DrillBuf> out = new Pointer<>();
-
-    final boolean withinMemoryEnvelope;
-
-    try {
-      withinMemoryEnvelope = allocator.shareOwnership((DrillBuf) body, out);
-    } catch(final AllocatorClosedException e) {
-      /*
-       * It can happen that between the time we get the fragment manager and we
-       * try to transfer this buffer to it, the fragment may have been cancelled
-       * and closed. When that happens, the allocator will be closed when we
-       * attempt this. That just means we can drop this data on the floor, since
-       * the receiver no longer exists (and no longer needs it).
-       *
-       * Note that checking manager.isCancelled() before we attempt this isn't enough,
-       * because of timing: it may still be cancelled between that check and
-       * the attempt to do the memory transfer. To double check ourselves, we
-       * do check manager.isCancelled() here, after the fact; it shouldn't
-       * change again after its allocator has been closed.
-       */
-      assert manager.isCancelled();
-      return;
-    }
-
-    if (!withinMemoryEnvelope) {
-      // if we over reserved, we need to add poison pill before batch.
-      dataHandler.handle(manager, OOM_FRAGMENT, null, null);
-    }
-
-    ack.increment();
-    dataHandler.handle(manager, fragmentBatch, out.value, ack);
-
-    // make sure to release the reference count we have to the new buffer.
-    // dataHandler.handle should have taken any ownership it needed.
-    out.value.release();
-  }
 
   private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
 
@@ -226,7 +195,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
     return new OutOfMemoryHandler() {
       @Override
       public void handle() {
-        dataHandler.informOutOfMemory();
+        logger.error("Out of memory in RPC layer.");
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java
new file mode 100644
index 0000000..a9bc305
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/IncomingDataBatch.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
+import org.apache.drill.exec.record.RawFragmentBatch;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An incoming batch of data. The data is held by the original allocator. Any use of the associated data must be
+ * leveraged through the use of newRawFragmentBatch().
+ */
+public class IncomingDataBatch {
+
+  private final FragmentRecordBatch header;
+  private final DrillBuf body;
+  private final AckSender sender;
+
+  /**
+   * Create a new batch. Does not impact reference counts of body.
+   *
+   * @param header
+   *          Batch header
+   * @param body
+   *          Data body. Could be null.
+   * @param sender
+   *          AckSender to use for underlying RawFragmentBatches.
+   */
+  public IncomingDataBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
+    Preconditions.checkNotNull(header);
+    Preconditions.checkNotNull(sender);
+    this.header = header;
+    this.body = body;
+    this.sender = sender;
+  }
+
+  /**
+   * Create a new RawFragmentBatch based on this incoming data batch that is transferred into the provided allocator.
+   * Also increments the AckSender to expect one additional return message.
+   *
+   * @param allocator
+   *          Target allocator that should be associated with data underlying this batch.
+   * @return The newly created RawFragmentBatch
+   */
+  public RawFragmentBatch newRawFragmentBatch(final BufferAllocator allocator) {
+    final DrillBuf transferredBuffer = body == null ? null : body.transferOwnership(allocator).buffer;
+    sender.increment();
+    return new RawFragmentBatch(header, transferredBuffer, sender);
+  }
+
+  public FragmentRecordBatch getHeader() {
+    return header;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index ddba213..049ae0c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -70,7 +70,9 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
 
   public UserServer(DrillConfig config, ScanResult classpathScan, BufferAllocator alloc, EventLoopGroup eventLoopGroup,
       UserWorker worker, Executor executor) throws DrillbitStartupException {
-    super(UserRpcConfig.getMapping(config, executor), alloc.getUnderlyingAllocator(), eventLoopGroup);
+    super(UserRpcConfig.getMapping(config, executor),
+        alloc.getAsByteBufAllocator(),
+        eventLoopGroup);
     this.worker = worker;
     this.alloc = alloc;
     // TODO: move this up

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index feebbb2..99523d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -188,7 +188,7 @@ public class Drillbit implements AutoCloseable {
     context = new BootStrapContext(config, classpathScan);
     manager = new WorkManager(context);
     engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context,
-        manager.getWorkBus(), manager.getDataHandler(), allowPortHunting);
+        manager.getWorkBus(), manager.getBee(), allowPortHunting);
 
     webServer = new WebServer(config, context.getMetrics(), manager);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index d3b4128..e07ca90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -38,9 +38,9 @@ import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.ControllerImpl;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
-import org.apache.drill.exec.rpc.data.DataResponseHandler;
 import org.apache.drill.exec.rpc.user.UserServer;
 import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
 import org.apache.drill.exec.work.batch.ControlMessageHandler;
 import org.apache.drill.exec.work.user.UserWorker;
 
@@ -58,7 +58,7 @@ public class ServiceEngine implements Closeable{
   private final boolean allowPortHunting;
 
   public ServiceEngine(ControlMessageHandler controlMessageHandler, UserWorker userWorker, BootStrapContext context,
-      WorkEventBus workBus, DataResponseHandler dataHandler, boolean allowPortHunting) throws DrillbitStartupException {
+      WorkEventBus workBus, WorkerBee bee, boolean allowPortHunting) throws DrillbitStartupException {
     final EventLoopGroup eventLoopGroup = TransportCheck.createEventLoopGroup(
         context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS), "UserServer-");
     this.userServer = new UserServer(
@@ -69,7 +69,7 @@ public class ServiceEngine implements Closeable{
         userWorker,
         context.getExecutor());
     this.controller = new ControllerImpl(context, controlMessageHandler, allowPortHunting);
-    this.dataPool = new DataConnectionCreator(context, workBus, dataHandler, allowPortHunting);
+    this.dataPool = new DataConnectionCreator(context, workBus, bee, allowPortHunting);
     this.config = context.getConfig();
     this.allowPortHunting = allowPortHunting;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 12e00f2..ec3cae8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -38,8 +38,8 @@ import org.apache.drill.exec.physical.base.ScanStats;
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.physical.impl.ScanBatch;
 import org.apache.drill.exec.physical.impl.WriterRecordBatch;
-import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.record.CloseableRecordBatch;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.AbstractRecordReader;
@@ -53,12 +53,11 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
 import org.apache.drill.exec.store.dfs.FormatPlugin;
 import org.apache.drill.exec.store.schedule.CompleteFileWork;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 
-import org.apache.hadoop.fs.Path;
-
 public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements FormatPlugin {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyFormatPlugin.class);
 
@@ -161,10 +160,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
     }
 
     int numParts = 0;
-    OperatorContext oContext = context.newOperatorContext(scan, false /*
-                                                                       * ScanBatch is not subject to fragment memory
-                                                                       * limit
-                                                                       */);
+    OperatorContext oContext = context.newOperatorContext(scan);
     final DrillFileSystem dfs;
     try {
       dfs = oContext.newFileSystem(fsConf);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 710edd6..8f4f5fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -109,7 +109,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
   public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
     super();
-    this.oContext = context.newOperatorContext(writer, true);
+    this.oContext = context.newOperatorContext(writer);
     this.codecFactory = CodecFactory.createDirectCodecFactory(writer.getFormatPlugin().getFsConf(),
         new ParquetDirectByteBufferAllocator(oContext.getAllocator()), pageSize);
     this.partitionColumns = writer.getPartitionColumns();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 0cb12f8..afa4fa0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -43,7 +43,6 @@ import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
 import org.apache.drill.exec.store.parquet2.DrillParquetReader;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.hadoop.CodecFactory;
 import org.apache.parquet.hadoop.ParquetFileReader;
@@ -70,10 +69,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
       .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
     List<SchemaPath> columns = rowGroupScan.getColumns();
     List<RecordReader> readers = Lists.newArrayList();
-    OperatorContext oContext = context.newOperatorContext(rowGroupScan, false /*
-                                                                               * ScanBatch is not subject to fragment
-                                                                               * memory limit
-                                                                               */);
+    OperatorContext oContext = context.newOperatorContext(rowGroupScan);
 
     List<String[]> partitionColumns = Lists.newArrayList();
     List<Integer> selectedPartitionColumns = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
index e02b413..e624ada 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/MemoryIterator.java
@@ -24,7 +24,7 @@ import java.lang.management.MemoryUsage;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
@@ -63,7 +63,7 @@ public class MemoryIterator implements Iterator<Object> {
 
 
     memoryInfo.direct_current = context.getDrillbitContext().getAllocator().getAllocatedMemory();
-    memoryInfo.direct_max = TopLevelAllocator.MAXIMUM_DIRECT_MEMORY;
+    memoryInfo.direct_max = DrillConfig.getMaxDirectMemory();
     return memoryInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 93fd13e..5fd6f1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -37,8 +37,6 @@ import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
 import org.apache.drill.exec.rpc.data.DataConnectionCreator;
-import org.apache.drill.exec.rpc.data.DataResponseHandler;
-import org.apache.drill.exec.rpc.data.DataResponseHandlerImpl;
 import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.store.sys.PStoreProvider;
@@ -75,7 +73,6 @@ public class WorkManager implements AutoCloseable {
   private DrillbitContext dContext;
 
   private final ControlMessageHandler controlMessageWorker;
-  private final DataResponseHandler dataHandler;
   private final UserWorker userWorker;
   private final WorkerBee bee;
   private final WorkEventBus workBus;
@@ -97,7 +94,6 @@ public class WorkManager implements AutoCloseable {
     controlMessageWorker = new ControlMessageHandler(bee); // TODO getFragmentRunner(), getForemanForQueryId()
     userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
     statusThread = new StatusThread();
-    dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()
   }
 
   public void start(
@@ -132,10 +128,6 @@ public class WorkManager implements AutoCloseable {
     return workBus;
   }
 
-  public DataResponseHandler getDataHandler() {
-    return dataHandler;
-  }
-
   public ControlMessageHandler getControlMessageHandler() {
     return controlMessageWorker;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 1579c2b..8a3e5b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerArray;
 
 import org.apache.drill.common.AutoCloseables;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -93,26 +92,18 @@ public abstract class AbstractDataCollector implements DataCollector{
   @Override
   public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch)  throws IOException {
 
-    // if we received an out of memory, add an item to all the buffer queues.
-    if (batch.getHeader().getIsOutOfMemory()) {
-      for (RawBatchBuffer buffer : buffers) {
-        buffer.enqueue(batch);
-      }
-    }
-
     // check to see if we have enough fragments reporting to proceed.
-    boolean decremented = false;
+    boolean decrementedToZero = false;
     if (remainders.compareAndSet(fragmentMap.get(minorFragmentId), 0, 1)) {
       int rem = remainingRequired.decrementAndGet();
       if (rem == 0) {
-        parentAccounter.decrementAndGet();
-        decremented = true;
+        decrementedToZero = 0 == parentAccounter.decrementAndGet();
       }
     }
 
     getBuffer(minorFragmentId).enqueue(batch);
 
-    return decremented;
+    return decrementedToZero;
   }
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 31f2e4a..f15a3e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -88,28 +88,10 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
         throw new IOException("Attempted to enqueue batch after finished");
       }
     }
-    if (batch.getHeader().getIsOutOfMemory()) {
-      handleOutOfMemory(batch);
-      return;
-    }
     enqueueInner(batch);
   }
 
   /**
-   * handle the out of memory case
-   *
-   * @param batch
-   */
-  protected void handleOutOfMemory(final RawFragmentBatch batch) {
-    if (!bufferQueue.checkForOutOfMemory()) {
-      logger.debug("Adding OOM message to front of queue. Current queue size: {}", bufferQueue.size());
-      bufferQueue.addOomBatch(batch);
-    } else {
-      logger.debug("ignoring duplicate OOM message");
-    }
-  }
-
-  /**
    * implementation specific method to enqueue batch
    *
    * @param batch
@@ -202,11 +184,11 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
       return null;
     }
 
+    if (context.isOverMemoryLimit()) {
+      outOfMemory.set(true);
+    }
+
     if (b != null) {
-      if (b.getHeader().getIsOutOfMemory()) {
-        outOfMemory.set(true);
-        return b;
-      }
 
       upkeep(b);
 
@@ -234,7 +216,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
   }
 
   private void assertAckSent(RawFragmentBatch batch) {
-    assert batch == null || batch.isAckSent() || batch.getHeader().getIsOutOfMemory() : "Ack not sent for batch";
+    assert batch == null || batch.isAckSent() : "Ack not sent for batch";
   }
 
   private int decrementStreamCounter() {

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index b21c61d..a516fad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -21,13 +21,17 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.Collector;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -38,11 +42,22 @@ import com.google.common.collect.Maps;
 public class IncomingBuffers implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IncomingBuffers.class);
 
+  private volatile boolean closed = false;
   private final AtomicInteger streamsRemaining = new AtomicInteger(0);
   private final AtomicInteger remainingRequired;
-  private final Map<Integer, DataCollector> fragCounts;
+  private final Map<Integer, DataCollector> collectorMap;
   private final FragmentContext context;
 
+  /**
+   * Lock used to manage close and data acceptance. We should only create a local reference to incoming data in the case
+   * that the incoming buffers are !closed. As such, we need to make sure that we aren't in the process of closing the
+   * incoming buffers when data is arriving. The read lock can be shared by many incoming batches but the write lock
+   * must be exclusive to the close method.
+   */
+  private final ReadWriteLock lock = new ReentrantReadWriteLock();
+  private final AutoCloseableLock sharedIncomingBatchLock = new AutoCloseableLock(lock.readLock());
+  private final AutoCloseableLock exclusiveCloseLock = new AutoCloseableLock(lock.writeLock());
+
   public IncomingBuffers(PlanFragment fragment, FragmentContext context) {
     this.context = context;
     Map<Integer, DataCollector> collectors = Maps.newHashMap();
@@ -56,39 +71,50 @@ public class IncomingBuffers implements AutoCloseable {
     }
 
     logger.debug("Came up with a list of {} required fragments.  Fragments {}", remainingRequired.get(), collectors);
-    fragCounts = ImmutableMap.copyOf(collectors);
+    collectorMap = ImmutableMap.copyOf(collectors);
 
     // Determine the total number of incoming streams that will need to be completed before we are finished.
     int totalStreams = 0;
-    for (DataCollector bc : fragCounts.values()) {
+    for (DataCollector bc : collectorMap.values()) {
       totalStreams += bc.getTotalIncomingFragments();
     }
     assert totalStreams >= remainingRequired.get() : String.format("Total Streams %d should be more than the minimum number of streams to commence (%d).  It isn't.", totalStreams, remainingRequired.get());
     streamsRemaining.set(totalStreams);
   }
 
-  public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException, IOException {
-    // no need to do anything if we've already enabled running.
-    // logger.debug("New Batch Arrived {}", batch);
-    if (batch.getHeader().getIsOutOfMemory()) {
-      for (DataCollector fSet : fragCounts.values()) {
-        fSet.batchArrived(0, batch);
+  public boolean batchArrived(final IncomingDataBatch incomingBatch) throws FragmentSetupException, IOException {
+
+    // we want to make sure that we only generate local record batch reference in the case that we're not closed.
+    // Otherwise we would leak memory.
+    try (AutoCloseableLock lock = sharedIncomingBatchLock.open()) {
+      if (closed) {
+        return false;
       }
-      return false;
-    }
-    if (batch.getHeader().getIsLastBatch()) {
-      streamsRemaining.decrementAndGet();
-    }
-    int sendMajorFragmentId = batch.getHeader().getSendingMajorFragmentId();
-    DataCollector fSet = fragCounts.get(sendMajorFragmentId);
-    if (fSet == null) {
-      throw new FragmentSetupException(String.format("We received a major fragment id that we were not expecting.  The id was %d. %s", sendMajorFragmentId, Arrays.toString(fragCounts.values().toArray())));
-    }
-    synchronized (this) {
-      boolean decremented = fSet.batchArrived(batch.getHeader().getSendingMinorFragmentId(), batch);
-      // we should only return true if remaining required has been decremented and is currently equal to zero.
-      return decremented && remainingRequired.get() == 0;
+
+      if (incomingBatch.getHeader().getIsLastBatch()) {
+        streamsRemaining.decrementAndGet();
+      }
+
+      final int sendMajorFragmentId = incomingBatch.getHeader().getSendingMajorFragmentId();
+      DataCollector collector = collectorMap.get(sendMajorFragmentId);
+      if (collector == null) {
+        throw new FragmentSetupException(String.format(
+            "We received a major fragment id that we were not expecting.  The id was %d. %s", sendMajorFragmentId,
+            Arrays.toString(collectorMap.values().toArray())));
+      }
+
+      synchronized (collector) {
+        final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator());
+        boolean decrementedToZero = collector
+            .batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch);
+        newRawFragmentBatch.release();
+
+        // we should only return true if remaining required has been decremented and is currently equal to zero.
+        return decrementedToZero;
+      }
+
     }
+
   }
 
   public int getRemainingRequired() {
@@ -100,19 +126,19 @@ public class IncomingBuffers implements AutoCloseable {
   }
 
   public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
-    return fragCounts.get(senderMajorFragmentId).getBuffers();
+    return collectorMap.get(senderMajorFragmentId).getBuffers();
   }
 
-
-
-
   public boolean isDone() {
     return streamsRemaining.get() < 1;
   }
 
   @Override
   public void close() throws Exception {
-    AutoCloseables.close(fragCounts.values().toArray(new AutoCloseable[0]));
+    try (AutoCloseableLock lock = exclusiveCloseLock.open()) {
+      closed = true;
+      AutoCloseables.close(collectorMap.values());
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 753c75d..9915b7e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -29,11 +29,9 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.proto.ExecProtos;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.record.RawFragmentBatch;
@@ -82,9 +80,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
   private Path path;
   private FSDataOutputStream outputStream;
 
-  public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) throws IOException, OutOfMemoryException {
+  public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) {
     super(context, fragmentCount);
-    this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION, true);
+    this.allocator = context.getNewChildAllocator(
+        "SpoolingRawBatchBufer", 100, ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION);
     this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
     this.oppositeId = oppositeId;
     this.bufferIndex = bufferIndex;
@@ -224,11 +223,10 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB
 
   @Override
   protected void upkeep(RawFragmentBatch batch) {
-    FragmentRecordBatch header = batch.getHeader();
-    if (header.getIsOutOfMemory()) {
+    if (context.isOverMemoryLimit()) {
       outOfMemory.set(true);
-      return;
     }
+
     DrillBuf body = batch.getBody();
     if (body != null) {
       currentSizeInMemory -= body.capacity();

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index ef06ea8..b177770 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -65,7 +65,7 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch
 
     @Override
     public boolean checkForOutOfMemory() {
-      return buffer.peekFirst().getHeader().getIsOutOfMemory();
+      return context.isOverMemoryLimit();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 71f9307..7b015a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -44,7 +44,7 @@ import org.apache.drill.exec.coord.DistributedSemaphore;
 import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.opt.BasicOptimizer;
@@ -159,7 +159,7 @@ public class Foreman implements Runnable {
     this.closeFuture = initiatingClient.getChannel().closeFuture();
     closeFuture.addListener(closeListener);
 
-    queryContext = new QueryContext(connection.getSession(), drillbitContext);
+    queryContext = new QueryContext(connection.getSession(), drillbitContext, queryId);
     queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
         stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
 
@@ -407,7 +407,6 @@ public class Foreman implements Runnable {
     setupRootFragment(rootPlanFragment, work.getRootOperator());
 
     setupNonRootFragments(planFragments);
-    drillbitContext.getAllocator().resetLimits(); // TODO a global effect for this query?!?
 
     moveToState(QueryState.RUNNING, null);
     logger.debug("Fragments running.");
@@ -435,7 +434,7 @@ public class Foreman implements Runnable {
       final OptionManager optionManager = queryContext.getOptions();
       final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
       long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
-          queryContext.getConfig().getLong(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC));
+          queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
       maxAllocPerNode = Math.min(maxAllocPerNode,
           optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
       final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index ff348cb..20315e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 
 /**
  * The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources
@@ -37,7 +37,7 @@ public interface FragmentManager {
    * @return True if the fragment has enough incoming data to be able to be run.
    * @throws FragmentSetupException, IOException
    */
-  boolean handle(RawFragmentBatch batch) throws FragmentSetupException, IOException;
+  boolean handle(IncomingDataBatch batch) throws FragmentSetupException, IOException;
 
   /**
    * Get the fragment runner for this incoming fragment. Note, this can only be requested once.

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 9378e51..b9cf8e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -26,8 +26,8 @@ import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 import org.apache.drill.exec.work.foreman.ForemanException;
@@ -69,7 +69,7 @@ public class NonRootFragmentManager implements FragmentManager {
    * @see org.apache.drill.exec.work.fragment.FragmentHandler#handle(org.apache.drill.exec.rpc.RemoteConnection.ConnectionThrottle, org.apache.drill.exec.record.RawFragmentBatch)
    */
   @Override
-  public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
+  public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
     return buffers.batchArrived(batch);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 0713398..0f7b10e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -24,8 +24,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.record.RawFragmentBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.IncomingDataBatch;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
 // TODO a lot of this is the same as NonRootFragmentManager
@@ -46,7 +46,7 @@ public class RootFragmentManager implements FragmentManager {
   }
 
   @Override
-  public boolean handle(final RawFragmentBatch batch) throws FragmentSetupException, IOException {
+  public boolean handle(final IncomingDataBatch batch) throws FragmentSetupException, IOException {
     return buffers.batchArrived(batch);
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index 46f0526..2077d6e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -31,9 +31,6 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.io.Files;
-
-import org.apache.drill.common.DrillAutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.scanner.ClassPathScanner;
@@ -43,9 +40,9 @@ import org.apache.drill.exec.ExecTest;
 import org.apache.drill.exec.client.DrillClient;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.memory.RootAllocatorFactory;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.memory.RootAllocator;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
 import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
 import org.apache.drill.exec.proto.UserBitShared.QueryType;
@@ -72,11 +69,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
 import com.google.common.io.Resources;
 
-import static org.hamcrest.core.StringContains.containsString;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.fail;
-
 public class BaseTestQuery extends ExecTest {
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class);
 
@@ -268,9 +260,6 @@ public class BaseTestQuery extends ExecTest {
         if (bit != null) {
           final DrillbitContext drillbitContext = bit.getContext();
           final BufferAllocator bufferAllocator = drillbitContext.getAllocator();
-          if (!(bufferAllocator instanceof RootAllocator)) {
-            throw new IllegalStateException("The DrillbitContext's allocator is not a RootAllocator");
-          }
           final RootAllocator rootAllocator = (RootAllocator) bufferAllocator;
           rootAllocator.verify();
         }

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java b/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
deleted file mode 100644
index 19613fe..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/TestAllocationException.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.memory.TopLevelAllocator;
-import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
-import org.apache.drill.exec.testing.Controls;
-import org.apache.drill.exec.testing.ControlsInjectionUtil;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * Run several tpch queries and inject an OutOfMemoryException in ScanBatch that will cause an OUT_OF_MEMORY outcome to
- * be propagated downstream. Make sure the proper "memory error" message is sent to the client.
- */
-@Ignore("Need to add exception site in memory layer that doesn't depend on other Drill code.")
-public class TestAllocationException extends BaseTestQuery {
-
-  private static final String SINGLE_MODE = "ALTER SESSION SET `planner.disable_exchanges` = true";
-
-  private void testWithException(final String fileName) throws Exception{
-    test(SINGLE_MODE);
-
-    final String controls = Controls.newBuilder()
-      .addException(TopLevelAllocator.class,
-        TopLevelAllocator.CHILD_BUFFER_INJECTION_SITE,
-        OutOfMemoryException.class,
-        200,
-        1
-      ).build();
-    ControlsInjectionUtil.setControls(client, controls);
-
-    String query = getFile(fileName);
-
-    try {
-      test(query);
-      fail("The query should have failed!");
-    } catch(UserException uex) {
-      DrillPBError error = uex.getOrCreatePBError(false);
-      assertEquals(DrillPBError.ErrorType.RESOURCE, error.getErrorType());
-      assertTrue("Error message isn't related to memory error",
-        uex.getMessage().contains(UserException.MEMORY_ERROR_MSG));
-    }
-  }
-
-  @Test
-  public void testWithOOM() throws Exception{
-    testWithException("queries/tpch/01.sql");
-  }
-}

http://git-wip-us.apache.org/repos/asf/drill/blob/809f4620/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 2389dc9..322e54a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -18,30 +18,19 @@
 
 package org.apache.drill.exec.memory;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import com.google.common.collect.Lists;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-
+import static org.junit.Assert.fail;
 import io.netty.buffer.DrillBuf;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 
-import io.netty.buffer.UnsafeDirectLittleEndian;
-
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.common.util.FileUtils;
 import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.OpProfileDef;
@@ -52,7 +41,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
 import org.apache.drill.exec.proto.BitControl;
-import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.server.Drillbit;
@@ -65,15 +53,16 @@ import org.apache.drill.test.DrillTest;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class TestAllocators extends DrillTest {
 
   private static final Properties TEST_CONFIGURATIONS = new Properties() {
     {
-      put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "14000000");
-      put(AccountorImpl.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
-      put(AccountorImpl.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
+      put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "14000000");
+      // put(AccountorImpl.ENABLE_FRAGMENT_MEMORY_LIMIT, "true");
+      // put(AccountorImpl.FRAGMENT_MEM_OVERCOMMIT_FACTOR, "1.1");
     }
   };
 
@@ -95,8 +84,7 @@ public class TestAllocators extends DrillTest {
 
     final Properties props = new Properties() {
       {
-        put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
-        put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
+        put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "1000000");
       }
     };
     final DrillConfig config = DrillConfig.create(props);
@@ -140,8 +128,7 @@ public class TestAllocators extends DrillTest {
   public void testClearBitVector() {
     final Properties props = new Properties() {
       {
-        put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
-        put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
+        put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "1000000");
       }
     };
     final DrillConfig config = DrillConfig.create(props);
@@ -163,147 +150,145 @@ public class TestAllocators extends DrillTest {
   public void testTransfer() throws Exception {
     final Properties props = new Properties() {
       {
-        put(TopLevelAllocator.TOP_LEVEL_MAX_ALLOC, "1000000");
-        put(TopLevelAllocator.ERROR_ON_MEMORY_LEAK, "true");
+        put(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC, "1049600");
       }
     };
     final DrillConfig config = DrillConfig.create(props);
     BufferAllocator a = RootAllocatorFactory.newRoot(config);
-    BufferAllocator b = RootAllocatorFactory.newRoot(config);
+    BufferAllocator a1 = a.newChildAllocator("a1", 0, Integer.MAX_VALUE);
+    BufferAllocator a2 = a.newChildAllocator("a2", 0, Integer.MAX_VALUE);
 
-    DrillBuf buf1 = a.buffer(1_000_000);
-    DrillBuf buf2 = b.buffer(1_000);
-    b.takeOwnership(buf1);
+    DrillBuf buf1 = a1.buffer(1_000_000);
+    DrillBuf buf2 = a2.buffer(1_000);
+    DrillBuf buf3 = buf1.transferOwnership(a2).buffer;
 
     buf1.release();
     buf2.release();
+    buf3.release();
 
+    a1.close();
+    a2.close();
     a.close();
-    b.close();
   }
 
   @Test
   public void testAllocators() throws Exception {
     // Setup a drillbit (initializes a root allocator)
     final DrillConfig config = DrillConfig.create(TEST_CONFIGURATIONS);
-    final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
-    final Drillbit bit = new Drillbit(config, serviceSet);
-    bit.run();
-    final DrillbitContext bitContext = bit.getContext();
-    FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
-    StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);
-
-    // Create a few Fragment Contexts
-
-    BitControl.PlanFragment.Builder pfBuilder1=BitControl.PlanFragment.newBuilder();
-    pfBuilder1.setMemInitial(1500000);
-    BitControl.PlanFragment pf1=pfBuilder1.build();
-    BitControl.PlanFragment.Builder pfBuilder2=BitControl.PlanFragment.newBuilder();
-    pfBuilder2.setMemInitial(500000);
-    BitControl.PlanFragment pf2=pfBuilder1.build();
-
-    FragmentContext fragmentContext1 = new FragmentContext(bitContext, pf1, null, functionRegistry);
-    FragmentContext fragmentContext2 = new FragmentContext(bitContext, pf2, null, functionRegistry);
-
-    // Get a few physical operators. Easiest way is to read a physical plan.
-    PhysicalPlanReader planReader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(bitContext, storageRegistry);
-    PhysicalPlan plan = planReader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(planFile), Charsets.UTF_8));
-    List<PhysicalOperator> physicalOperators = plan.getSortedOperators();
-    Iterator<PhysicalOperator> physicalOperatorIterator = physicalOperators.iterator();
-
-    PhysicalOperator physicalOperator1 = physicalOperatorIterator.next();
-    PhysicalOperator physicalOperator2 = physicalOperatorIterator.next();
-    PhysicalOperator physicalOperator3 = physicalOperatorIterator.next();
-    PhysicalOperator physicalOperator4 = physicalOperatorIterator.next();
-    PhysicalOperator physicalOperator5 = physicalOperatorIterator.next();
-    PhysicalOperator physicalOperator6 = physicalOperatorIterator.next();
-
-    // Create some bogus Operator profile defs and stats to create operator contexts
-    OpProfileDef def;
-    OperatorStats stats;
-
-    //Use some bogus operator type to create a new operator context.
-    def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
-        OperatorContext.getChildCount(physicalOperator1));
-    stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
-
-
-    // Add a couple of Operator Contexts
-    // Initial allocation = 1000000 bytes for all operators
-    OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1, true);
-    DrillBuf b11=oContext11.getAllocator().buffer(1000000);
-
-    OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats, true);
-    DrillBuf b12=oContext12.getAllocator().buffer(500000);
-
-    OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3, true);
-
-    def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
-        OperatorContext.getChildCount(physicalOperator4));
-    stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
-    OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats, true);
-    DrillBuf b22=oContext22.getAllocator().buffer(2000000);
-
-    // New Fragment begins
-    BitControl.PlanFragment.Builder pfBuilder3=BitControl.PlanFragment.newBuilder();
-    pfBuilder3.setMemInitial(1000000);
-    BitControl.PlanFragment pf3=pfBuilder3.build();
-
-    FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry);
-
-    // New fragment starts an operator that allocates an amount within the limit
-    def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
-        OperatorContext.getChildCount(physicalOperator5));
-    stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
-    OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats, true);
-
-    DrillBuf b31a = oContext31.getAllocator().buffer(200000);
-
-    //Previously running operator completes
-    b22.release();
-    ((AutoCloseable) oContext22).close();
-
-    // Fragment 3 asks for more and fails
-    boolean outOfMem = false;
-    try {
-      oContext31.getAllocator().buffer(44000000);
-      fail("Fragment 3 should fail to allocate buffer");
-    } catch (OutOfMemoryRuntimeException e) {
-      outOfMem = true; // Expected.
-    }
-    assertTrue(outOfMem);
-
-    // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
-    OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false);
-    try {
-      DrillBuf b32 = oContext32.getAllocator().buffer(4400000);
-      b32.release();
-    } catch (OutOfMemoryException e) {
-      fail("Fragment 3 failed to allocate buffer");
-    } finally {
-      closeOp(oContext32);
-    }
 
-    b11.release();
-    closeOp(oContext11);
-    b12.release();
-    closeOp(oContext12);
-    closeOp(oContext21);
-    b31a.release();
-    closeOp(oContext31);
-
-    fragmentContext1.close();
-    fragmentContext2.close();
-    fragmentContext3.close();
-
-    bit.close();
-    serviceSet.close();
-
-/*
-    // ---------------------------------------- DEBUG ----------------------------------
-    assertEquals(0, UnsafeDirectLittleEndian.getBufferCount());
-    // ---------------------------------------- DEBUG ----------------------------------
-*/
+    try (final RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+        final Drillbit bit = new Drillbit(config, serviceSet)) {
+      ;
+      bit.run();
+      final DrillbitContext bitContext = bit.getContext();
+      FunctionImplementationRegistry functionRegistry = bitContext.getFunctionImplementationRegistry();
+      StoragePluginRegistry storageRegistry = new StoragePluginRegistry(bitContext);
+
+      // Create a few Fragment Contexts
+
+      BitControl.PlanFragment.Builder pfBuilder1 = BitControl.PlanFragment.newBuilder();
+      pfBuilder1.setMemInitial(1500000);
+      BitControl.PlanFragment pf1 = pfBuilder1.build();
+      BitControl.PlanFragment.Builder pfBuilder2 = BitControl.PlanFragment.newBuilder();
+      pfBuilder2.setMemInitial(500000);
+      BitControl.PlanFragment pf2 = pfBuilder1.build();
+
+      FragmentContext fragmentContext1 = new FragmentContext(bitContext, pf1, null, functionRegistry);
+      FragmentContext fragmentContext2 = new FragmentContext(bitContext, pf2, null, functionRegistry);
+
+      // Get a few physical operators. Easiest way is to read a physical plan.
+      PhysicalPlanReader planReader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(bitContext,
+          storageRegistry);
+      PhysicalPlan plan = planReader.readPhysicalPlan(Files.toString(FileUtils.getResourceAsFile(planFile),
+          Charsets.UTF_8));
+      List<PhysicalOperator> physicalOperators = plan.getSortedOperators();
+      Iterator<PhysicalOperator> physicalOperatorIterator = physicalOperators.iterator();
+
+      PhysicalOperator physicalOperator1 = physicalOperatorIterator.next();
+      PhysicalOperator physicalOperator2 = physicalOperatorIterator.next();
+      PhysicalOperator physicalOperator3 = physicalOperatorIterator.next();
+      PhysicalOperator physicalOperator4 = physicalOperatorIterator.next();
+      PhysicalOperator physicalOperator5 = physicalOperatorIterator.next();
+      PhysicalOperator physicalOperator6 = physicalOperatorIterator.next();
+
+      // Create some bogus Operator profile defs and stats to create operator contexts
+      OpProfileDef def;
+      OperatorStats stats;
+
+      // Use some bogus operator type to create a new operator context.
+      def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+          OperatorContext.getChildCount(physicalOperator1));
+      stats = fragmentContext1.getStats().newOperatorStats(def, fragmentContext1.getAllocator());
+
+      // Add a couple of Operator Contexts
+      // Initial allocation = 1000000 bytes for all operators
+      OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1);
+      DrillBuf b11 = oContext11.getAllocator().buffer(1000000);
+
+      OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats);
+      DrillBuf b12 = oContext12.getAllocator().buffer(500000);
+
+      OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3);
+
+      def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
+          OperatorContext.getChildCount(physicalOperator4));
+      stats = fragmentContext2.getStats().newOperatorStats(def, fragmentContext2.getAllocator());
+      OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats);
+      DrillBuf b22 = oContext22.getAllocator().buffer(2000000);
+
+      // New Fragment begins
+      BitControl.PlanFragment.Builder pfBuilder3 = BitControl.PlanFragment.newBuilder();
+      pfBuilder3.setMemInitial(1000000);
+      BitControl.PlanFragment pf3 = pfBuilder3.build();
+
+      FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry);
+
+      // New fragment starts an operator that allocates an amount within the limit
+      def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
+          OperatorContext.getChildCount(physicalOperator5));
+      stats = fragmentContext3.getStats().newOperatorStats(def, fragmentContext3.getAllocator());
+      OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats);
+
+      DrillBuf b31a = oContext31.getAllocator().buffer(200000);
+
+      // Previously running operator completes
+      b22.release();
+      ((AutoCloseable) oContext22).close();
+
+      // Fragment 3 asks for more and fails
+      boolean outOfMem = false;
+      try {
+        oContext31.getAllocator().buffer(44000000);
+        fail("Fragment 3 should fail to allocate buffer");
+      } catch (OutOfMemoryException e) {
+        outOfMem = true; // Expected.
+      }
+      assertTrue(outOfMem);
+
+      // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
+      OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6);
+      try {
+        DrillBuf b32 = oContext32.getAllocator().buffer(4400000);
+        b32.release();
+      } catch (OutOfMemoryException e) {
+        fail("Fragment 3 failed to allocate buffer");
+      } finally {
+        closeOp(oContext32);
+      }
+
+      b11.release();
+      closeOp(oContext11);
+      b12.release();
+      closeOp(oContext12);
+      closeOp(oContext21);
+      b31a.release();
+      closeOp(oContext31);
+
+      fragmentContext1.close();
+      fragmentContext2.close();
+      fragmentContext3.close();
+
+    }
   }
 
   private void closeOp(OperatorContext c) throws Exception {