You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/05/27 21:05:10 UTC
[drill] branch master updated: DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 245df6f8e6 DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)
245df6f8e6 is described below
commit 245df6f8e69c909fd30810a400417842e91273a7
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Sat May 28 00:05:01 2022 +0300
DRILL-8236: Move HttpHelperFunctions to use JSON2 reader (#2566)
* DRILL-8236: Move HttpHelperFunctions to use JSON2 reader
* DRILL-8236: Move HttpHelperFunctions to use JSON2 reader
Co-authored-by: Charles Givre <cg...@apache.org>
---
.../exec/store/http/udfs/HttpHelperFunctions.java | 52 +++++++++-------------
.../exec/store/http/TestHttpUDFFunctions.java | 16 ++++++-
.../apache/drill/exec/ops/BufferManagerImpl.java | 4 ++
.../apache/drill/exec/ops/FragmentContextImpl.java | 8 ++++
.../org/apache/drill/exec/ops/QueryContext.java | 8 ++++
.../org/apache/drill/exec/ops/UdfUtilities.java | 4 ++
.../drill/exec/physical/impl/scan/ReaderState.java | 41 ++++++++---------
.../store/easy/json/loader/JsonLoaderImpl.java | 12 +++++
.../org/apache/drill/test/OperatorFixture.java | 8 ++++
.../org/apache/drill/test/QueryBatchIterator.java | 3 ++
.../java/org/apache/drill/test/QueryBuilder.java | 30 ++++++-------
.../java/org/apache/drill/test/QueryResultSet.java | 47 +++++++++----------
.../apache/drill/exec/memory/BaseAllocator.java | 24 +++++-----
.../org/apache/drill/exec/ops/BufferManager.java | 3 ++
14 files changed, 150 insertions(+), 110 deletions(-)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index 25f9f071ae..c669677f70 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.http.udfs;
-import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.expr.DrillSimpleFunc;
import org.apache.drill.exec.expr.annotations.FunctionTemplate;
import org.apache.drill.exec.expr.annotations.Output;
@@ -26,6 +25,7 @@ import org.apache.drill.exec.expr.annotations.Param;
import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -52,22 +52,16 @@ public class HttpHelperFunctions {
OptionManager options;
@Inject
- DrillBuf buffer;
+ ResultSetLoader loader;
@Workspace
- org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+ org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
@Override
public void setup() {
- jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
- .defaultSchemaPathColumns()
- .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
- .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
- .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
- .build();
-
- jsonReader.setIgnoreJSONParseErrors(
- options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG));
+ jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+ .resultSetLoader(loader)
+ .standardOptions(options);
}
@Override
@@ -102,10 +96,11 @@ public class HttpHelperFunctions {
}
try {
- jsonReader.setSource(results);
- jsonReader.setIgnoreJSONParseErrors(true); // Reduce number of errors
- jsonReader.write(writer);
- buffer = jsonReader.getWorkBuf();
+ jsonLoaderBuilder.fromString(results);
+ org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
+ loader.startBatch();
+ jsonLoader.readBatch();
+ loader.close();
} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
@@ -134,10 +129,10 @@ public class HttpHelperFunctions {
DrillbitContext drillbitContext;
@Inject
- DrillBuf buffer;
+ ResultSetLoader loader;
@Workspace
- org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader;
+ org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder jsonLoaderBuilder;
@Workspace
org.apache.drill.exec.store.http.HttpStoragePlugin plugin;
@@ -147,15 +142,9 @@ public class HttpHelperFunctions {
@Override
public void setup() {
- jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader.Builder(buffer)
- .defaultSchemaPathColumns()
- .readNumbersAsDouble(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READ_NUMBERS_AS_DOUBLE).bool_val)
- .allTextMode(options.getOption(org.apache.drill.exec.ExecConstants.JSON_ALL_TEXT_MODE).bool_val)
- .enableNanInf(options.getOption(org.apache.drill.exec.ExecConstants.JSON_READER_NAN_INF_NUMBERS).bool_val)
- .build();
-
- jsonReader.setIgnoreJSONParseErrors(
- options.getBoolean(org.apache.drill.exec.ExecConstants.JSON_READER_SKIP_INVALID_RECORDS_FLAG));
+ jsonLoaderBuilder = new org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder()
+ .resultSetLoader(loader)
+ .standardOptions(options);
String schemaPath = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(rawInput.start, rawInput.end, rawInput.buffer);
// Get the plugin name and endpoint name
@@ -208,10 +197,11 @@ public class HttpHelperFunctions {
}
try {
- jsonReader.setSource(results);
- jsonReader.setIgnoreJSONParseErrors(true); // Reduce number of errors
- jsonReader.write(writer);
- buffer = jsonReader.getWorkBuf();
+ jsonLoaderBuilder.fromString(results);
+ org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
+ loader.startBatch();
+ jsonLoader.readBatch();
+ loader.close();
} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 9e32116d9f..24d73baa02 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -18,13 +18,17 @@
package org.apache.drill.exec.store.http;
+import ch.qos.logback.classic.Level;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
+import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
@@ -32,6 +36,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.shaded.guava.com.google.common.io.Files;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.LogFixture;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -51,10 +56,17 @@ public class TestHttpUDFFunctions extends ClusterTest {
private static final int MOCK_SERVER_PORT = 47771;
private static String TEST_JSON_RESPONSE;
private static String DUMMY_URL = "http://localhost:" + MOCK_SERVER_PORT;
-
+ protected static LogFixture logFixture;
+ private final static Level CURRENT_LOG_LEVEL = Level.INFO;
@BeforeClass
public static void setup() throws Exception {
+ logFixture = LogFixture.builder()
+ .toConsole()
+ .logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
+ .logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
+ .logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
+ .build();
startCluster(ClusterFixture.builder(dirTestWatcher));
TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -133,7 +145,7 @@ public class TestHttpUDFFunctions extends ClusterTest {
client.queryBuilder().sql(sql).run();
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
+ assertTrue(e.getMessage(), e.getMessage().contains("FUNCTION ERROR: nope is not a valid plugin."));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index d0dcace8d2..c42e7de93a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -44,6 +44,10 @@ public class BufferManagerImpl implements BufferManager {
managedBuffers.clear();
}
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
@Override
public DrillBuf replace(DrillBuf old, int newSize) {
if (managedBuffers.remove(old.memoryAddress()) == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index 889c251628..dafbf883d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -47,6 +47,9 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.QueryContext.SqlStatementType;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -347,6 +350,11 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
return context;
}
+ @Override
+ public ResultSetLoader getResultSetLoader() {
+ return new ResultSetLoaderImpl(bufferManager.getAllocator(), new ResultSetOptions());
+ }
+
@Override
public DrillbitEndpoint getForemanEndpoint() {
return fragment.getForeman();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 0b37ce1aeb..57ba2da6ba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -32,6 +32,9 @@ import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.expr.holders.ValueHolder;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
@@ -321,6 +324,11 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return drillbitContext;
}
+ @Override
+ public ResultSetLoader getResultSetLoader() {
+ return new ResultSetLoaderImpl(allocator, new ResultSetOptions());
+ }
+
@Override
public PartitionExplorer getPartitionExplorer() {
return new PartitionExplorerImpl(getRootSchema());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
index cee74ff90e..98637610c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/UdfUtilities.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.holders.ValueHolder;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.PartitionExplorer;
@@ -44,6 +45,7 @@ public interface UdfUtilities {
.put(OptionManager.class, "getOptions")
.put(BufferManager.class, "getManagedBufferManager")
.put(DrillbitContext.class, "getDrillbitContext")
+ .put(ResultSetLoader.class, "getResultSetLoader")
.build();
@@ -98,6 +100,8 @@ public interface UdfUtilities {
*/
DrillbitContext getDrillbitContext();
+ ResultSetLoader getResultSetLoader();
+
/**
* Works with value holders cache which holds constant value and its wrapper by type.
* If value is absent uses holderInitializer to create holder and adds it to cache.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
index ac83b493a7..90c11fe970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/ReaderState.java
@@ -344,29 +344,26 @@ class ReaderState {
*/
protected boolean next() {
switch (state) {
- case LOOK_AHEAD:
- case LOOK_AHEAD_WITH_EOF:
- // Use batch previously read.
- assert lookahead != null;
- lookahead.exchange(scanOp.containerAccessor.container());
- assert lookahead.getRecordCount() == 0;
- lookahead = null;
- if (state == State.LOOK_AHEAD_WITH_EOF) {
- state = State.EOF;
- } else {
- state = State.ACTIVE;
+ case LOOK_AHEAD:
+ case LOOK_AHEAD_WITH_EOF:
+ // Use batch previously read.
+ assert lookahead != null;
+ lookahead.exchange(scanOp.containerAccessor.container());
+ assert lookahead.getRecordCount() == 0;
+ lookahead = null;
+ if (state == State.LOOK_AHEAD_WITH_EOF) {
+ state = State.EOF;
+ } else {
+ state = State.ACTIVE;
+ }
+ return true;
+ case ACTIVE:
+ return readBatch();
+ case EOF:
+ return false;
+ default:
+ throw new IllegalStateException("Unexpected state: " + state);
}
- return true;
-
- case ACTIVE:
- return readBatch();
-
- case EOF:
- return false;
-
- default:
- throw new IllegalStateException("Unexpected state: " + state);
- }
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index e5755cb07b..a4ee14f075 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,10 +20,12 @@ package org.apache.drill.exec.store.easy.json.loader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.commons.io.IOUtils;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
@@ -189,6 +191,16 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
return this;
}
+ public JsonLoaderBuilder fromString(String jsonString) {
+ try (InputStream targetStream = IOUtils.toInputStream(jsonString, Charset.defaultCharset())) {
+ return fromStream(targetStream);
+ } catch (IOException e) {
+ throw UserException.dataReadError(e)
+ .message("Could not read JSON string: " + jsonString)
+ .build(logger);
+ }
+ }
+
public JsonLoaderBuilder fromReader(Reader reader) {
this.reader = reader;
return this;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index d73d2527df..4d945c3df6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -18,6 +18,9 @@
package org.apache.drill.test;
import org.apache.drill.exec.alias.AliasRegistryProvider;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl.ResultSetOptions;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.metastore.MetastoreRegistry;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
@@ -244,6 +247,11 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
return null;
}
+ @Override
+ public ResultSetLoader getResultSetLoader() {
+ return new ResultSetLoaderImpl(allocator, new ResultSetOptions());
+ }
+
@Override
public ExecutorState getExecutorState() {
return executorState;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
index 2bace961c8..d127ac40bd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBatchIterator.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.test;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.resultSet.impl.PullResultSetReaderImpl.UpstreamSource;
@@ -156,6 +157,8 @@ public class QueryBatchIterator implements UpstreamSource, AutoCloseable {
QueryEvent event = listener.get();
if (event.type == QueryEvent.Type.EOF) {
state = State.EOF;
+ } else if (event.type == QueryEvent.Type.ERROR) {
+ throw DrillRuntimeException.create("Closed with outstanding buffers allocated");
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index fb1fe1549c..41b6ecea11 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -694,21 +694,21 @@ public class QueryBuilder {
QueryEvent event = listener.get();
switch (event.type)
{
- case BATCH:
- batchCount++;
- recordCount += event.batch.getHeader().getRowCount();
- event.batch.release();
- break;
- case EOF:
- state = event.state;
- break loop;
- case ERROR:
- throw event.error;
- case QUERY_ID:
- queryId = event.queryId;
- break;
- default:
- throw new IllegalStateException("Unexpected event: " + event.type);
+ case BATCH:
+ batchCount++;
+ recordCount += event.batch.getHeader().getRowCount();
+ event.batch.release();
+ break;
+ case EOF:
+ state = event.state;
+ break loop;
+ case ERROR:
+ throw event.error;
+ case QUERY_ID:
+ queryId = event.queryId;
+ break;
+ default:
+ throw new IllegalStateException("Unexpected event: " + event.type);
}
}
long end = System.currentTimeMillis();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
index d8e2da16e4..880c05e759 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryResultSet.java
@@ -58,32 +58,27 @@ public class QueryResultSet {
}
while (true) {
QueryEvent event = listener.get();
- switch (event.type)
- {
- case BATCH:
- batchCount++;
- recordCount += event.batch.getHeader().getRowCount();
- loader.load(event.batch.getHeader().getDef(), event.batch.getData());
- event.batch.release();
- return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
-
- case EOF:
- state = event.state;
- eof = true;
- return null;
-
- case ERROR:
- state = event.state;
- eof = true;
- throw event.error;
-
- case QUERY_ID:
- queryId = event.queryId;
- continue;
-
- default:
- throw new IllegalStateException("Unexpected event: " + event.type);
- }
+ switch (event.type) {
+ case BATCH:
+ batchCount++;
+ recordCount += event.batch.getHeader().getRowCount();
+ loader.load(event.batch.getHeader().getDef(), event.batch.getData());
+ event.batch.release();
+ return DirectRowSet.fromVectorAccessible(loader.allocator(), loader);
+ case EOF:
+ state = event.state;
+ eof = true;
+ return null;
+ case ERROR:
+ state = event.state;
+ eof = true;
+ throw event.error;
+ case QUERY_ID:
+ queryId = event.queryId;
+ continue;
+ default:
+ throw new IllegalStateException("Unexpected event: " + event.type);
+ }
}
}
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
index 717ef29693..a9270423ad 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/memory/BaseAllocator.java
@@ -57,7 +57,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
* because the buffer is used multiple times by an operator.
*/
- private static final int IO_BUFFER_SIZE = 32*1024;
+ private static final int IO_BUFFER_SIZE = 32 * 1024;
private final Object DEBUG_LOCK = DEBUG ? new Object() : null;
private final BaseAllocator parentAllocator;
@@ -500,13 +500,13 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
if (allocatedCount > 0) {
throw new IllegalStateException(
String.format("Allocator[%s] closed with outstanding buffers allocated (%d).\n%s",
- name, allocatedCount, toString()));
+ name, allocatedCount, this));
}
if (reservations.size() != 0) {
throw new IllegalStateException(
String.format("Allocator[%s] closed with outstanding reservations (%d).\n%s", name, reservations.size(),
- toString()));
+ this));
}
}
@@ -535,8 +535,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
@Override
public String toString() {
- final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE
- : Verbosity.BASIC;
+ final Verbosity verbosity = logger.isTraceEnabled() ? Verbosity.LOG_WITH_STACKTRACE : Verbosity.BASIC;
final StringBuilder sb = new StringBuilder();
print(sb, 0, verbosity);
return sb.toString();
@@ -562,8 +561,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
/**
* Rounds up the provided value to the nearest power of two.
*
- * @param val
- * An integer value.
+ * @param val An integer value.
* @return The closest power of two of that value.
*/
public static int nextPowerOfTwo(int val) {
@@ -578,8 +576,7 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
/**
* Rounds up the provided value to the nearest power of two.
*
- * @param val
- * An integer long value.
+ * @param val An integer long value.
* @return The closest power of two of that value.
*/
public static long longNextPowerOfTwo(long val) {
@@ -622,7 +619,6 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
}
synchronized (DEBUG_LOCK) {
-
final long allocated = getAllocatedMemory();
// verify my direct descendants
@@ -689,9 +685,9 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
sb.append("allocator[");
sb.append(name);
sb.append("]\nallocated: ");
- sb.append(Long.toString(allocated));
+ sb.append(allocated);
sb.append(" allocated - (bufferTotal + reservedTotal + childTotal): ");
- sb.append(Long.toString(allocated - (bufferTotal + reservedTotal + childTotal)));
+ sb.append(allocated - (bufferTotal + reservedTotal + childTotal));
sb.append('\n');
if (bufferTotal != 0) {
@@ -703,14 +699,14 @@ public abstract class BaseAllocator extends Accountant implements BufferAllocato
if (childTotal != 0) {
sb.append("child total: ");
- sb.append(Long.toString(childTotal));
+ sb.append(childTotal);
sb.append('\n');
for (final BaseAllocator childAllocator : childSet) {
sb.append("child allocator[");
sb.append(childAllocator.name);
sb.append("] owned ");
- sb.append(Long.toString(childAllocator.getAllocatedMemory()));
+ sb.append(childAllocator.getAllocatedMemory());
sb.append('\n');
}
}
diff --git a/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java b/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
index 4345a82c81..df4390feae 100644
--- a/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
+++ b/exec/memory/base/src/main/java/org/apache/drill/exec/ops/BufferManager.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
/**
* Manages a list of {@link DrillBuf}s that can be reallocated as needed. Upon
@@ -62,5 +63,7 @@ public interface BufferManager extends AutoCloseable {
*/
public DrillBuf getManagedBuffer(int size);
+ BufferAllocator getAllocator();
+
public void close();
}