You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/05/27 21:05:10 UTC

[drill] branch master updated: DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 245df6f8e6 DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)
245df6f8e6 is described below

commit 245df6f8e69c909fd30810a400417842e91273a7
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Sat May 28 00:05:01 2022 +0300

    DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)
    
    * DRILL-8236: Move HttpHelperFunctions to use JSON2 reader
    
    * DRILL-8236: Move HttpHelperFunctions to use JSON2 reader
    
    Co-authored-by: Charles Givre <cg...@apache.org>
---
 .../exec/store/http/udfs/HttpHelperFunctions.java  | 52 +++++++++-------------
 .../exec/store/http/TestHttpUDFFunctions.java      | 16 ++++++-
 .../apache/drill/exec/ops/BufferManagerImpl.java   |  4 ++
 .../apache/drill/exec/ops/FragmentContextImpl.java |  8 ++++
 .../org/apache/drill/exec/ops/QueryContext.java    |  8 ++++
 .../org/apache/drill/exec/ops/UdfUtilities.java    |  4 ++
 .../drill/exec/physical/impl/scan/ReaderState.java | 41 ++++++++---------
 .../store/easy/json/loader/JsonLoaderImpl.java     | 12 +++++
 .../org/apache/drill/test/OperatorFixture.java     |  8 ++++
 .../org/apache/drill/test/QueryBatchIterator.java  |  3 ++
 .../java/org/apache/drill/test/QueryBuilder.java   | 30 ++++++-------
 .../java/org/apache/drill/test/QueryResultSet.java | 47 +++++++++----------
 .../apache/drill/exec/memory/BaseAllocator.java    | 24 +++++-----
 .../org/apache/drill/exec/ops/BufferManager.java   |  3 ++
 14 files changed, 150 insertions(+), 110 deletions(-)

diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index 25f9f071ae..c669677f70 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -18,7 +18,6 @@
 
 package org.apache.drill.exec.store.http.udfs;
 
-import io.netty.buffer.DrillBuf;
 import org.apache.drill.exec.expr.DrillSimpleFunc;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate;
 import org.apache.drill.exec.expr.annotations.Output;
@@ -26,6 +25,7 @@ import org.apache.drill.exec.expr.annotations.Param;
 import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -52,22 +52,16 @@ public class HttpHelperFunctions {
     OptionManager options;
 
     @Inject
-    DrillBuf buffer;
+    ResultSetLoader loader;
 
     @Workspace
-    org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
 
     @Override
     public void setup() {
-      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
-        .defaultSchemaPathColumns()
-        .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
-        .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
-        .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
-        .build();
-
-      jsonReader.setIgnoreJSONParseErrors(
-        options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG));
+      jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+        .resultSetLoader(loader)
+        .standardOptions(options);
     }
 
     @Override
@@ -102,10 +96,11 @@ public class HttpHelperFunctions {
       }
 
       try {
-        jsonReader.setSource(results);
-        jsonReader.setIgnoreJSONParseErrors(true);  // Reduce number of errors
-        jsonReader.write(writer);
-        buffer = jsonReader.getWorkBuf();
+        jsonLoaderBuilder.fromString(results);
+        org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
+        loader.startBatch();
+        jsonLoader.readBatch();
+        loader.close();
       } catch (Exception e) {
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
@@ -134,10 +129,10 @@ public class HttpHelperFunctions {
     DrillbitContext drillbitContext;
 
     @Inject
-    DrillBuf buffer;
+    ResultSetLoader loader;
 
     @Workspace
-    org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+    org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
 
     @Workspace
     org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
@@ -147,15 +142,9 @@ public class HttpHelperFunctions {
 
     @Override
     public void setup() {
-      jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
-        .defaultSchemaPathColumns()
-        .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
-        .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
-        .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
-        .build();
-
-      jsonReader.setIgnoreJSONParseErrors(
-        options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG));
+      jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+        .resultSetLoader(loader)
+        .standardOptions(options);
 
       String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
       // Get the plugin name and endpoint name
@@ -208,10 +197,11 @@ public class HttpHelperFunctions {
       }
 
       try {
-        jsonReader.setSource(results);
-        jsonReader.setIgnoreJSONParseErrors(true);  // Reduce number of errors
-        jsonReader.write(writer);
-        buffer = jsonReader.getWorkBuf();
+        jsonLoaderBuilder.fromString(results);
+        org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
+        loader.startBatch();
+        jsonLoader.readBatch();
+        loader.close();
       } catch (Exception e) {
         throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
       }
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 9e32116d9f..24d73baa02 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -18,13 +18,17 @@
 
 package org.apache.drill.exec.store.http;
 
+import ch.qos.logback.classic.Level;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
 import org.apache.drill.common.logical.security.PlainCredentialsProvider;
 import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
+import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
 import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
 import org.apache.drill.exec.store.http.util.SimpleHttp;
 import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
 import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -32,6 +36,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.shaded.guava.com.google.common.io.Files;
 import org.apache.drill.test.ClusterFixture;
 import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -51,10 +56,17 @@ public class TestHttpUDFFunctions extends ClusterTest {
   private static final int MOCK_SERVER_PORT = 47771;
   private static String TEST_JSON_RESPONSE;
   private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
-
+  protected static LogFixture logFixture;
+  private final static Level CURRENT_LOG_LEVEL = Level.INFO;
 
   @BeforeClass
   public static void setup() throws Exception {
+    logFixture = LogFixture.builder()
+      .toConsole()
+      .logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
+      .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
+      .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
+      .build();
     startCluster(ClusterFixture.builder(dirTestWatcher));
     TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
 
@@ -133,7 +145,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
       client.queryBuilder().sql(sql).run();
       fail();
     } catch (Exception e) {
-      assertTrue(e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
+      assertTrue(e.getMessage(), e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index d0dcace8d2..c42e7de93a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -44,6 +44,10 @@ public class BufferManagerImpl implements BufferManager {
     managedBuffers.clear();
   }
 
+  public BufferAllocator getAllocator() {
+    return allocator;
+  }
+
   @Override
   public DrillBuf replace(DrillBuf old, int newSize) {
     if (managedBuffers.remove(old.memoryAddress()) == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 889c251628..dafbf883d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -47,6 +47,9 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.QueryContext.SqlStatementType;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -347,6 +350,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
     return context;
   }
 
+  @Override
+  public ResultSetLoader getResultSetLoader() {
+    return new ResultSetLoaderImpl(bufferManager.getAllocator(), new ResultSetOptions());
+  }
+
   @Override
   public DrillbitEndpoint getForemanEndpoint() {
     return fragment.getForeman();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 0b37ce1aeb..57ba2da6ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -32,6 +32,9 @@ import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
 import org.apache.drill.exec.expr.holders.ValueHolder;
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.planner.sql.DrillOperatorTable;
 import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
@@ -321,6 +324,11 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
     return drillbitContext;
   }
 
+  @Override
+  public ResultSetLoader getResultSetLoader() {
+    return new ResultSetLoaderImpl(allocator, new ResultSetOptions());
+  }
+
   @Override
   public PartitionExplorer getPartitionExplorer() {
     return new PartitionExplorerImpl(getRootSchema());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index cee74ff90e..98637610c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.ops;
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.PartitionExplorer;
@@ -44,6 +45,7 @@ public interface UdfUtilities {
           .put(OptionManager.class, "getOptions")
           .put(BufferManager.class, "getManagedBufferManager")
           .put(DrillbitContext.class, "getDrillbitContext")
+          .put(ResultSetLoader.class, "getResultSetLoader")
           .build();
 
 
@@ -98,6 +100,8 @@ public interface UdfUtilities {
    */
   DrillbitContext getDrillbitContext();
 
+  ResultSetLoader getResultSetLoader();
+
   /**
    * Works with value holders cache which holds constant value and its wrapper by type.
    * If value is absent uses holderInitializer to create holder and adds it to cache.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index ac83b493a7..90c11fe970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -344,29 +344,26 @@ class ReaderState {
    */
   protected boolean next() {
     switch (state) {
-    case LOOK_AHEAD:
-    case LOOK_AHEAD_WITH_EOF:
-      // Use batch previously read.
-      assert lookahead != null;
-      lookahead.exchange(scanOp.containerAccessor.container());
-      assert lookahead.getRecordCount() == 0;
-      lookahead = null;
-      if (state == State.LOOK_AHEAD_WITH_EOF) {
-        state = State.EOF;
-      } else {
-        state = State.ACTIVE;
+      case LOOK_AHEAD:
+      case LOOK_AHEAD_WITH_EOF:
+        // Use batch previously read.
+        assert lookahead != null;
+        lookahead.exchange(scanOp.containerAccessor.container());
+        assert lookahead.getRecordCount() == 0;
+        lookahead = null;
+        if (state == State.LOOK_AHEAD_WITH_EOF) {
+          state = State.EOF;
+        } else {
+          state = State.ACTIVE;
+        }
+        return true;
+      case ACTIVE:
+        return readBatch();
+      case EOF:
+        return false;
+      default:
+        throw new IllegalStateException("Unexpected state: " + state);
       }
-      return true;
-
-    case ACTIVE:
-      return readBatch();
-
-    case EOF:
-      return false;
-
-    default:
-      throw new IllegalStateException("Unexpected state: " + state);
-    }
   }
 
   /**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index e5755cb07b..a4ee14f075 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,10 +20,12 @@ package org.apache.drill.exec.store.easy.json.loader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Reader;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -189,6 +191,16 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
       return this;
     }
 
+    public JsonLoaderBuilder fromString(String jsonString) {
+      try (InputStream targetStream = IOUtils.toInputStream(jsonString, Charset.defaultCharset())) {
+        return fromStream(targetStream);
+      } catch (IOException e) {
+        throw UserException.dataReadError(e)
+          .message("Could not read JSON string: " + jsonString)
+          .build(logger);
+      }
+    }
+
     public JsonLoaderBuilder fromReader(Reader reader) {
       this.reader = reader;
       return this;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index d73d2527df..4d945c3df6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -18,6 +18,9 @@
 package org.apache.drill.test;
 
 import org.apache.drill.exec.alias.AliasRegistryProvider;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.metastore.MetastoreRegistry;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
@@ -244,6 +247,11 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
       return null;
     }
 
+    @Override
+    public ResultSetLoader getResultSetLoader() {
+      return new ResultSetLoaderImpl(allocator, new ResultSetOptions());
+    }
+
     @Override
     public ExecutorState getExecutorState() {
       return executorState;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
index 2bace961c8..d127ac40bd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
@@ -156,6 +157,8 @@ public class QueryBatchIterator implements UpstreamSource, AutoCloseable {
       QueryEvent event = listener.get();
       if (event.type == QueryEvent.Type.EOF) {
         state = State.EOF;
+      } else if (event.type == QueryEvent.Type.ERROR) {
+        throw DrillRuntimeException.create("Closed with outstanding buffers allocated");
       }
     }
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index fb1fe1549c..41b6ecea11 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -694,21 +694,21 @@ public class QueryBuilder {
       QueryEvent event = listener.get();
       switch (event.type)
       {
-      case BATCH:
-        batchCount++;
-        recordCount += event.batch.getHeader().getRowCount();
-        event.batch.release();
-        break;
-      case EOF:
-        state = event.state;
-        break loop;
-      case ERROR:
-        throw event.error;
-      case QUERY_ID:
-        queryId = event.queryId;
-        break;
-      default:
-        throw new IllegalStateException("Unexpected event: " + event.type);
+        case BATCH:
+          batchCount++;
+          recordCount += event.batch.getHeader().getRowCount();
+          event.batch.release();
+          break;
+        case EOF:
+          state = event.state;
+          break loop;
+        case ERROR:
+          throw event.error;
+        case QUERY_ID:
+          queryId = event.queryId;
+          break;
+        default:
+          throw new IllegalStateException("Unexpected event: " + event.type);
       }
     }
     long end = System.currentTimeMillis();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
index d8e2da16e4..880c05e759 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -58,32 +58,27 @@ public class QueryResultSet {
     }
     while (true) {
       QueryEvent event = listener.get();
-      switch (event.type)
-      {
-      case BATCH:
-        batchCount++;
-        recordCount += event.batch.getHeader().getRowCount();
-        loader.load(event.batch.getHeader().getDef(), event.batch.getData());
-        event.batch.release();
-        return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
-
-      case EOF:
-        state = event.state;
-        eof = true;
-        return null;
-
-      case ERROR:
-        state = event.state;
-        eof = true;
-        throw event.error;
-
-      case QUERY_ID:
-        queryId = event.queryId;
-        continue;
-
-      default:
-        throw new IllegalStateException("Unexpected event: " + event.type);
-      }
+      switch (event.type) {
+        case BATCH:
+          batchCount++;
+          recordCount += event.batch.getHeader().getRowCount();
+          loader.load(event.batch.getHeader().getDef(), event.batch.getData());
+          event.batch.release();
+          return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
+        case EOF:
+          state = event.state;
+          eof = true;
+          return null;
+        case ERROR:
+          state = event.state;
+          eof = true;
+          throw event.error;
+        case QUERY_ID:
+          queryId = event.queryId;
+          continue;
+        default:
+          throw new IllegalStateException("Unexpected event: " + event.type);
+        }
     }
   }
 
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 717ef29693..a9270423ad 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -57,7 +57,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
    * because the buffer is used multiple times by an operator.
    */
 
-  private static final int IO_BUFFER_SIZE = 32*1024;
+  private static final int IO_BUFFER_SIZE = 32 * 1024;
   private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
 
   private final BaseAllocator parentAllocator;
@@ -500,13 +500,13 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
         if (allocatedCount > 0) {
           throw new IllegalStateException(
               String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
-                  name, allocatedCount, toString()));
+                  name, allocatedCount, this));
         }
 
         if (reservations.size() != 0) {
           throw new IllegalStateException(
               String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(),
-                  toString()));
+                this));
         }
 
       }
@@ -535,8 +535,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
   @Override
   public String toString() {
-    final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
-        : Verbosity.BASIC;
+    final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE : Verbosity.BASIC;
     final StringBuilder sb = new StringBuilder();
     print(sb, 0, verbosity);
     return sb.toString();
@@ -562,8 +561,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   /**
    * Rounds up the provided value to the nearest power of two.
    *
-   * @param val
-   *          An integer value.
+   * @param val An integer value.
    * @return The closest power of two of that value.
    */
   public static int nextPowerOfTwo(int val) {
@@ -578,8 +576,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
   /**
    * Rounds up the provided value to the nearest power of two.
    *
-   * @param val
-   *          An integer long value.
+   * @param val An integer long value.
    * @return The closest power of two of that value.
    */
   public static long longNextPowerOfTwo(long val) {
@@ -622,7 +619,6 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
     }
 
     synchronized (DEBUG_LOCK) {
-
       final long allocated = getAllocatedMemory();
 
       // verify my direct descendants
@@ -689,9 +685,9 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
         sb.append("allocator[");
         sb.append(name);
         sb.append("]\nallocated: ");
-        sb.append(Long.toString(allocated));
+        sb.append(allocated);
         sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
-        sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
+        sb.append(allocated - (bufferTotal + reservedTotal + childTotal));
         sb.append('\n');
 
         if (bufferTotal != 0) {
@@ -703,14 +699,14 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
 
         if (childTotal != 0) {
           sb.append("child total: ");
-          sb.append(Long.toString(childTotal));
+          sb.append(childTotal);
           sb.append('\n');
 
           for (final BaseAllocator childAllocator : childSet) {
             sb.append("child allocator[");
             sb.append(childAllocator.name);
             sb.append("] owned ");
-            sb.append(Long.toString(childAllocator.getAllocatedMemory()));
+            sb.append(childAllocator.getAllocatedMemory());
             sb.append('\n');
           }
         }
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 4345a82c81..df4390feae 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
 
 /**
  * Manages a list of {@link DrillBuf}s that can be reallocated as needed. Upon
@@ -62,5 +63,7 @@ public interface BufferManager extends AutoCloseable {
    */
   public DrillBuf getManagedBuffer(int size);
 
+  BufferAllocator getAllocator();
+
   public void close();
 }