You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/06/07 15:41:47 UTC
[drill] branch master updated: DRILL-8242: Fix output for HttpHelperFunctions (#2568)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 570ce63088 DRILL-8242: Fix output for HttpHelperFunctions (#2568)
570ce63088 is described below
commit 570ce63088ab4e2b479b53a9b41696adbb0c9278
Author: Vitalii Diravka <vi...@apache.org>
AuthorDate: Tue Jun 7 18:41:41 2022 +0300
DRILL-8242: Fix output for HttpHelperFunctions (#2568)
* DRILL-8242: Fix output for HttpHelperFunctions
- add test to check the http_get function output
* DRILL-8242: Fix output for HttpHelperFunctions
* DRILL-8242: Fix old approach for other functions
* DRILL-8242: replace String with InputStream for SimpleHTTP requests
* Fix TestHttpPlugin#testSlowResponse. Sometimes DATA_READ ERROR: Read timed out can be returned
---
.../exec/store/http/udfs/HttpHelperFunctions.java | 32 ++++-----------
.../drill/exec/store/http/util/SimpleHttp.java | 40 ++++++++++++------
.../drill/exec/store/http/TestHttpPlugin.java | 3 +-
.../exec/store/http/TestHttpUDFFunctions.java | 38 +++++++++++++++++
.../exec/expr/fn/DrillComplexWriterFuncHolder.java | 34 ++++++++++-----
.../apache/drill/exec/ops/BufferManagerImpl.java | 9 ++--
.../physical/impl/project/ProjectBatchBuilder.java | 4 +-
.../impl/project/ProjectMemoryManager.java | 2 +-
.../physical/impl/project/ProjectRecordBatch.java | 48 +++++++++++++++-------
.../impl/project/ProjectionMaterializer.java | 3 +-
.../exec/physical/impl/project/Projector.java | 8 ++--
.../physical/impl/project/ProjectorTemplate.java | 3 +-
.../physical/resultSet/impl/ColumnBuilder.java | 6 +--
.../resultSet/impl/ResultSetLoaderImpl.java | 3 +-
.../drill/exec/record/AbstractRecordBatch.java | 14 +------
.../org/apache/drill/exec/record/BatchSchema.java | 3 +-
.../store/easy/json/loader/JsonLoaderImpl.java | 12 ------
.../vector/complex/impl/VectorContainerWriter.java | 4 ++
.../rest/spnego/TestDrillSpnegoAuthenticator.java | 1 +
.../vector/complex/impl/ComplexWriterImpl.java | 14 +++----
20 files changed, 165 insertions(+), 116 deletions(-)
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
index c669677f70..1ae05a501f 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/udfs/HttpHelperFunctions.java
@@ -45,7 +45,7 @@ public class HttpHelperFunctions {
@Param
NullableVarCharHolder[] inputReaders;
- @Output
+ @Output // todo: remove. Not used in this UDF
ComplexWriter writer;
@Inject
@@ -75,32 +75,25 @@ public class HttpHelperFunctions {
// as an approximation of null-if-null handling.
if (args == null) {
// Return empty map
- org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
- mapWriter.start();
- mapWriter.end();
return;
}
String finalUrl = org.apache.drill.exec.store.http.util.SimpleHttp.mapPositionalParameters(url, args);
// Make the API call
- String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeSimpleGetRequest(finalUrl);
+ java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.getRequestAndStreamResponse(finalUrl);
// If the result string is null or empty, return an empty map
- if (results == null || results.length() == 0) {
+ if (results == null) {
// Return empty map
- org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
- mapWriter.start();
- mapWriter.end();
return;
}
try {
- jsonLoaderBuilder.fromString(results);
+ jsonLoaderBuilder.fromStream(results);
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
loader.startBatch();
jsonLoader.readBatch();
- loader.close();
} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
@@ -119,7 +112,7 @@ public class HttpHelperFunctions {
@Param
NullableVarCharHolder[] inputReaders;
- @Output
+ @Output // todo: remove. Not used in this UDF
ComplexWriter writer;
@Inject
@@ -174,34 +167,27 @@ public class HttpHelperFunctions {
// as an approximation of null-if-null handling.
if (args == null) {
// Return empty map
- org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
- mapWriter.start();
- mapWriter.end();
return;
}
- String results = org.apache.drill.exec.store.http.util.SimpleHttp.makeAPICall(
+ java.io.InputStream results = org.apache.drill.exec.store.http.util.SimpleHttp.apiCall(
plugin,
endpointConfig,
drillbitContext,
args
- );
+ ).getInputStream();
// If the result string is null or empty, return an empty map
- if (results == null || results.length() == 0) {
+ if (results == null) {
// Return empty map
- org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter mapWriter = writer.rootAsMap();
- mapWriter.start();
- mapWriter.end();
return;
}
try {
- jsonLoaderBuilder.fromString(results);
+ jsonLoaderBuilder.fromStream(results);
org.apache.drill.exec.store.easy.json.loader.JsonLoader jsonLoader = jsonLoaderBuilder.build();
loader.startBatch();
jsonLoader.readBatch();
- loader.close();
} catch (Exception e) {
throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 0ec18de858..e5f0ce70d5 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -30,6 +30,7 @@ import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
+import okhttp3.ResponseBody;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.logical.OAuthConfig;
@@ -873,7 +874,7 @@ public class SimpleHttp {
* @param args An optional list of parameter arguments which will be included in the URL
* @return A String of the results.
*/
- public static String makeAPICall(
+ public static SimpleHttp apiCall(
HttpStoragePlugin plugin,
HttpApiConfig endpointConfig,
DrillbitContext context,
@@ -895,7 +896,7 @@ public class SimpleHttp {
}
// Now get the client
- SimpleHttp client = new SimpleHttpBuilder()
+ return new SimpleHttpBuilder()
.pluginConfig(pluginConfig)
.endpointConfig(endpointConfig)
.tempDir(new File(context.getConfig().getString(ExecConstants.DRILL_TMP_DIR)))
@@ -903,8 +904,6 @@ public class SimpleHttp {
.proxyConfig(proxyConfig)
.tokenTable(plugin.getTokenTable())
.build();
-
- return client.getResultsFromApiCall();
}
public static OkHttpClient getSimpleHttpClient() {
@@ -915,7 +914,29 @@ public class SimpleHttp {
.build();
}
- public static String makeSimpleGetRequest(String url) {
+ public static String getRequestAndStringResponse(String url) {
+ try {
+ return makeSimpleGetRequest(url).string();
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("HTTP request failed")
+ .build(logger);
+ }
+ }
+
+ public static InputStream getRequestAndStreamResponse(String url) {
+ try {
+ return makeSimpleGetRequest(url).byteStream();
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("HTTP request failed")
+ .build(logger);
+ }
+ }
+
+ public static ResponseBody makeSimpleGetRequest(String url) throws IOException {
OkHttpClient client = getSimpleHttpClient();
Request.Builder requestBuilder = new Request.Builder()
.url(url);
@@ -924,15 +945,8 @@ public class SimpleHttp {
Request request = requestBuilder.build();
// Execute the request
- try {
Response response = client.newCall(request).execute();
- return response.body().string();
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .message("HTTP request failed")
- .build(logger);
- }
+ return response.body();
}
/**
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 458003f5f3..928975f17a 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -1127,7 +1127,8 @@ public class TestHttpPlugin extends ClusterTest {
client.queryBuilder().sql(sql).rowSet();
fail();
} catch (Exception e) {
- assertTrue("Not timeout exception, " + e, e.getMessage().contains("DATA_READ ERROR: timeout"));
+ assertTrue("Not timeout exception, " + e,
+ e.getMessage().contains("DATA_READ ERROR: timeout") || e.getMessage().contains("DATA_READ ERROR: Read timed out"));
}
}
}
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
index 24d73baa02..fb30bef5aa 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpUDFFunctions.java
@@ -24,10 +24,16 @@ import okhttp3.mockwebserver.MockWebServer;
import okhttp3.mockwebserver.RecordedRequest;
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.util.DrillFileUtils;
+import org.apache.drill.exec.physical.impl.project.ProjectMemoryManager;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorBatchIterator;
+import org.apache.drill.exec.physical.resultSet.impl.ResultSetLoaderImpl;
import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
@@ -37,6 +43,7 @@ import org.apache.drill.shaded.guava.com.google.common.io.Files;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.LogFixture;
+import org.apache.drill.test.rowSet.RowSetUtilities;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -47,6 +54,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapValue;
+import static org.apache.drill.test.rowSet.RowSetUtilities.singleMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -63,9 +72,11 @@ public class TestHttpUDFFunctions extends ClusterTest {
public static void setup() throws Exception {
logFixture = LogFixture.builder()
.toConsole()
+ .logger(ProjectMemoryManager.class, CURRENT_LOG_LEVEL)
.logger(ProjectRecordBatch.class, CURRENT_LOG_LEVEL)
.logger(JsonLoaderImpl.class, CURRENT_LOG_LEVEL)
.logger(IteratorValidatorBatchIterator.class, CURRENT_LOG_LEVEL)
+ .logger(ResultSetLoaderImpl.class, CURRENT_LOG_LEVEL)
.build();
startCluster(ClusterFixture.builder(dirTestWatcher));
TEST_JSON_RESPONSE = Files.asCharSource(DrillFileUtils.getResourceAsFile("/data/simple.json"), Charsets.UTF_8).read();
@@ -98,6 +109,33 @@ public class TestHttpUDFFunctions extends ClusterTest {
RowSet results = client.queryBuilder().sql(sql).rowSet();
assertEquals(1, results.rowCount());
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addMap("result")
+ .addMap("results")
+ .addNullable("sunrise", TypeProtos.MinorType.VARCHAR)
+ .addNullable("sunset", TypeProtos.MinorType.VARCHAR)
+ .addNullable("solar_noon", TypeProtos.MinorType.VARCHAR)
+ .addNullable("day_length", TypeProtos.MinorType.VARCHAR)
+ .addNullable("civil_twilight_begin", TypeProtos.MinorType.VARCHAR)
+ .addNullable("civil_twilight_end", TypeProtos.MinorType.VARCHAR)
+ .addNullable("nautical_twilight_begin", TypeProtos.MinorType.VARCHAR)
+ .addNullable("nautical_twilight_end", TypeProtos.MinorType.VARCHAR)
+ .addNullable("astronomical_twilight_begin", TypeProtos.MinorType.VARCHAR)
+ .addNullable("astronomical_twilight_end", TypeProtos.MinorType.VARCHAR)
+ .resumeMap()
+ .addNullable("status", TypeProtos.MinorType.VARCHAR)
+ .resumeSchema()
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(singleMap(
+ mapValue(
+ mapValue("6:13:58 AM", "5:59:55 PM", "12:06:56 PM", "11:45:57", "5:48:14 AM",
+ "6:25:38 PM", "5:18:16 AM", "6:55:36 PM", "4:48:07 AM", "7:25:45 PM"),
+ "OK")))
+ .build();
+
+ RowSetUtilities.verify(expected, results);
results.clear();
RecordedRequest recordedRequest = server.takeRequest();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
index 92fb340128..c95415ef81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/DrillComplexWriterFuncHolder.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
import org.apache.drill.exec.physical.impl.project.ProjectRecordBatch;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.VectorAccessibleComplexWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
@@ -56,26 +57,39 @@ public class DrillComplexWriterFuncHolder extends DrillSimpleFuncHolder {
JBlock sub = new JBlock(true, true);
JBlock topSub = sub;
- JVar complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
+ JVar rsLoader = null;
+ JVar complexWriter = null;
+ JInvocation container = null;
+ for (JVar workspaceJVar : workspaceJVars) {
+ if ("ResultSetLoader".equals(workspaceJVar.type().name())) {
+ rsLoader = workspaceJVar;
+ }
+ }
+ if (rsLoader == null) {
+ complexWriter = classGenerator.declareClassField("complexWriter", classGenerator.getModel()._ref(ComplexWriter.class));
- JInvocation container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+ container = classGenerator.getMappingSet().getOutgoing().invoke("getOutgoingContainer");
+ }
//Default name is "col", if not passed in a reference name for the output vector.
String refName = fieldReference == null ? "col" : fieldReference.getRootSegment().getPath();
JClass cwClass = classGenerator.getModel().ref(VectorAccessibleComplexWriter.class);
- classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+ if (rsLoader == null) {
+ classGenerator.getSetupBlock().assign(complexWriter, cwClass.staticInvoke("getWriter").arg(refName).arg(container));
+ }
JClass projBatchClass = classGenerator.getModel().ref(ProjectRecordBatch.class);
JExpression projBatch = JExpr.cast(projBatchClass, classGenerator.getMappingSet().getOutgoing());
-
- classGenerator.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
-
-
- classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex()));
-
- sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
+ if (rsLoader == null) {
+ classGenerator.getSetupBlock().add(projBatch.invoke("addComplexWriter").arg(complexWriter));
+ classGenerator.getEvalBlock().add(complexWriter.invoke("setPosition").arg(classGenerator.getMappingSet().getValueWriteIndex()));
+ sub.decl(classGenerator.getModel()._ref(ComplexWriter.class), getReturnValue().getName(), complexWriter);
+ } else {
+ classGenerator.getSetupBlock().add(projBatch.invoke("addLoader").arg(rsLoader));
+ sub.decl(classGenerator.getModel()._ref(ResultSetLoader.class), getReturnValue().getName(), rsLoader);
+ }
// add the subblock after the out declaration.
classGenerator.getEvalBlock().add(topSub);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
index c42e7de93a..ea516c6b5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/BufferManagerImpl.java
@@ -34,12 +34,9 @@ public class BufferManagerImpl implements BufferManager {
@Override
public void close() {
- managedBuffers.forEach(new LongObjectPredicate<DrillBuf>() {
- @Override
- public boolean apply(long key, DrillBuf value) {
- value.release();
- return true;
- }
+ managedBuffers.forEach((LongObjectPredicate<DrillBuf>) (key, value) -> {
+ value.release();
+ return true;
});
managedBuffers.clear();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
index efb40d4c49..d4a7e69d19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchBuilder.java
@@ -95,7 +95,9 @@ public class ProjectBatchBuilder implements ProjectionMaterializer.BatchBuilder
@Override
public void addComplexField(FieldReference ref) {
- initComplexWriters();
+ if (projectBatch.rsLoader == null) {
+ initComplexWriters();
+ }
if (projectBatch.complexFieldReferencesList == null) {
projectBatch.complexFieldReferencesList = Lists.newArrayList();
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
index 571871b9fc..1391d64480 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectMemoryManager.java
@@ -278,7 +278,7 @@ public class ProjectMemoryManager extends RecordBatchMemoryManager {
+ ", total variable width {}, total complex width {}, batchSizer time {} ms, update time {} ms"
+ ", manager {}, incoming {}",outPutRowCount, batchSizer.rowCount(), incomingBatch.getRecordCount(),
rowWidth, totalFixedWidthColumnWidth, totalVariableColumnWidth, totalComplexColumnWidth,
- (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
+ (batchSizerEndTime - updateStartTime),(updateEndTime - updateStartTime), this, incomingBatch);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT, getRecordBatchSizer(), outgoingBatch.getRecordBatchStatsContext());
updateIncomingStats();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 60f8a1edf4..89e1beb6ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -19,20 +19,25 @@ package org.apache.drill.exec.physical.impl.project;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.SimpleRecordBatch;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchIOType;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +51,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
private static final Logger logger = LoggerFactory.getLogger(ProjectRecordBatch.class);
protected List<ValueVector> allocationVectors;
+ @Deprecated // use new writer rsLoader
protected List<ComplexWriter> complexWriters;
+ protected ResultSetLoader rsLoader;
protected List<FieldReference> complexFieldReferencesList;
protected ProjectMemoryManager memoryManager;
private Projector projector;
@@ -103,7 +110,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
memoryManager.update();
if (first && incomingRecordCount == 0) {
- if (complexWriters != null) {
+ if (complexWriters != null || rsLoader != null ) {
IterOutcome next = null;
while (incomingRecordCount == 0) {
if (getLastKnownOutcome() == EMIT) {
@@ -138,7 +145,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
}
- if (complexWriters != null && getLastKnownOutcome() == EMIT) {
+ if ((complexWriters != null || rsLoader != null) && getLastKnownOutcome() == EMIT) {
throw UserException.unsupportedError()
.message("Currently functions producing complex types as output are not " +
"supported in project list for subquery between LATERAL and UNNEST. Please re-write the query using this " +
@@ -170,13 +177,20 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
// In case of complex writer expression, vectors would be added to batch run-time.
// We have to re-build the schema.
- if (complexWriters != null) {
+ if (rsLoader != null && !rsLoader.isProjectionEmpty()) {
+ MapVector map = container.addOrGet(container.getLast().getField().getName(), Types.required(TypeProtos.MinorType.MAP), MapVector.class);
+ map.setMapValueCount(recordCount);
+ for (VectorWrapper<?> vectorWrapper : rsLoader.harvest()) {
+ ValueVector valueVector = vectorWrapper.getValueVector();
+ map.putChild(valueVector.getField().getName(), valueVector);
+ }
+ container.buildSchema(SelectionVectorMode.NONE);
+ } else if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
memoryManager.updateOutgoingStats(outputRecords);
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.OUTPUT, this, getRecordBatchStatsContext());
-
// Get the final outcome based on hasRemainder since that will determine if all the incoming records were
// consumed in current output batch or not
return getFinalOutcome(hasRemainder);
@@ -210,7 +224,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
}
// In case of complex writer expression, vectors would be added to batch run-time.
// We have to re-build the schema.
- if (complexWriters != null) {
+ if (complexWriters != null || rsLoader != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
@@ -224,6 +238,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
complexWriters.add(writer);
}
+ public void addLoader(ResultSetLoader loader) {
+ rsLoader = loader;
+ }
+
private void doAlloc(int recordCount) {
// Allocate vv in the allocationVectors.
for (ValueVector v : allocationVectors) {
@@ -251,12 +269,12 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
// the transfer pairs or vector copies.
container.setRecordCount(count);
- if (complexWriters == null) {
- return;
- }
-
- for (ComplexWriter writer : complexWriters) {
- writer.setValueCount(count);
+ if (complexWriters != null) {
+ for (ComplexWriter writer : complexWriters) {
+ writer.setValueCount(count);
+ }
+ } else if (rsLoader != null) {
+ rsLoader.setTargetRowCount(count);
}
}
@@ -276,8 +294,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
setupNewSchema(incomingBatch, configuredBatchSize);
- ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this,
- container, callBack, incomingBatch);
+ ProjectBatchBuilder batchBuilder = new ProjectBatchBuilder(this, container, callBack, incomingBatch);
ProjectionMaterializer em = new ProjectionMaterializer(context.getOptions(),
incomingBatch, popConfig.getExprs(), context.getFunctionRegistry(),
batchBuilder, unionTypeEnabled);
@@ -341,7 +358,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
protected IterOutcome getFinalOutcome(boolean hasMoreRecordInBoundary) {
// In a case of complex writers vectors are added at runtime, so the schema
// may change (e.g. when a batch contains new column(s) not present in previous batches)
- if (complexWriters != null) {
+ if (complexWriters != null || rsLoader != null) {
return IterOutcome.OK_NEW_SCHEMA;
}
return super.getFinalOutcome(hasMoreRecordInBoundary);
@@ -359,6 +376,9 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
if (complexWriters != null) {
container.clear();
+ } else if (rsLoader != null) {
+ container.clear();
+ rsLoader.close();
} else {
// Release the underlying DrillBufs and reset the ValueVectors to empty
// Not clearing the container here is fine since Project output schema is
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
index ee011b658e..55c630ac23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectionMaterializer.java
@@ -342,8 +342,7 @@ class ProjectionMaterializer {
transferFieldIds.add(fid);
}
- private void setupFnCall(NamedExpression namedExpression,
- LogicalExpression expr) {
+ private void setupFnCall(NamedExpression namedExpression, LogicalExpression expr) {
// Need to process ComplexWriter function evaluation.
// The reference name will be passed to ComplexWriter, used as the name of
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index 5e5e7ebe55..53d27807c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -27,9 +27,9 @@ import java.util.List;
public interface Projector {
- public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
- public abstract int projectRecords(RecordBatch incomingBatch, int startIndex, int recordCount, int firstOutputIndex);
+ void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
+ int projectRecords(RecordBatch incomingBatch, int startIndex, int recordCount, int firstOutputIndex);
- public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
+ TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
-}
\ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index dd30c443f0..9c1aced107 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -88,8 +88,7 @@ public abstract class ProjectorTemplate implements Projector {
@Override
public final void setup(FragmentContext context, RecordBatch incoming,
- RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException{
-
+ RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException {
this.svMode = incoming.getSchema().getSelectionVectorMode();
switch (svMode) {
case FOUR_BYTE:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
index 2731d3f6af..45f4b1f456 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ColumnBuilder.java
@@ -253,8 +253,7 @@ public class ColumnBuilder {
// have content that varies from batch to batch. Only the leaf
// vectors can be cached.
assert columnSchema.tupleSchema().isEmpty();
- mapVector = new RepeatedMapVector(mapColSchema.schema(),
- parent.loader().allocator(), null);
+ mapVector = new RepeatedMapVector(mapColSchema.schema(), parent.loader().allocator(), null);
offsetVector = mapVector.getOffsetVector();
} else {
mapVector = null;
@@ -262,8 +261,7 @@ public class ColumnBuilder {
}
// Create the writer using the offset vector
- final AbstractObjectWriter writer = MapWriter.buildMapArray(
- columnSchema, mapVector, new ArrayList<>());
+ final AbstractObjectWriter writer = MapWriter.buildMapArray(columnSchema, mapVector, new ArrayList<>());
// Wrap the offset vector in a vector state
VectorState offsetVectorState;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
index 5ec24ac871..10cca0b648 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/impl/ResultSetLoaderImpl.java
@@ -34,8 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Implementation of the result set loader. Caches vectors
- * for a row or map.
+ * Implementation of the result set loader. Caches vectors for a row or map.
*
* @see {@link ResultSetLoader}
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 6e363036a2..96e1fb16a9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -31,7 +31,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.server.options.OptionValue;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,17 +67,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
this.batchStatsContext = new RecordBatchStatsContext(context, oContext);
stats = oContext.getStats();
container = new VectorContainer(this.oContext.getAllocator());
- if (buildSchema) {
- state = BatchState.BUILD_SCHEMA;
- } else {
- state = BatchState.FIRST;
- }
- OptionValue option = context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE.getOptionName());
- if (option != null) {
- unionTypeEnabled = option.bool_val;
- } else {
- unionTypeEnabled = false;
- }
+ state = buildSchema ? BatchState.BUILD_SCHEMA : BatchState.FIRST;
+ unionTypeEnabled = context.getOptions().getBoolean(ExecConstants.ENABLE_UNION_TYPE_KEY);
}
public enum BatchState {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
index d4d385c248..fcffeea8aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/BatchSchema.java
@@ -247,8 +247,7 @@ public class BatchSchema implements Iterable<MaterializedField> {
otherSchema.selectionVectorMode != SelectionVectorMode.NONE) {
throw new IllegalArgumentException("Cannot merge schemas with selection vectors");
}
- List<MaterializedField> mergedFields =
- new ArrayList<>(fields.size() + otherSchema.fields.size());
+ List<MaterializedField> mergedFields = new ArrayList<>(fields.size() + otherSchema.fields.size());
mergedFields.addAll(this.fields);
mergedFields.addAll(otherSchema.fields);
return new BatchSchema(selectionVectorMode, mergedFields);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index a4ee14f075..e5755cb07b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -20,12 +20,10 @@ package org.apache.drill.exec.store.easy.json.loader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.commons.io.IOUtils;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.EmptyErrorContext;
import org.apache.drill.common.exceptions.UserException;
@@ -191,16 +189,6 @@ public class JsonLoaderImpl implements JsonLoader, ErrorFactory {
return this;
}
- public JsonLoaderBuilder fromString(String jsonString) {
- try (InputStream targetStream = IOUtils.toInputStream(jsonString, Charset.defaultCharset())) {
- return fromStream(targetStream);
- } catch (IOException e) {
- throw UserException.dataReadError(e)
- .message("Could not read JSON string: " + jsonString)
- .build(logger);
- }
- }
-
public JsonLoaderBuilder fromReader(Reader reader) {
this.reader = reader;
return this;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index 5866a9562d..04e30cf0e6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -26,6 +26,10 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.MapVector;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+/**
+ * @see {@link org.apache.drill.exec.physical.resultSet.ResultSetLoader} - the replacement for this class
+ */
+@Deprecated
public class VectorContainerWriter extends AbstractFieldWriter implements ComplexWriter {
//private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(VectorContainerWriter.class);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
index 914688e9b2..643af83bf2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/spnego/TestDrillSpnegoAuthenticator.java
@@ -67,6 +67,7 @@ import static org.mockito.Mockito.verify;
/**
* Test for validating {@link DrillSpnegoAuthenticator}
*/
+@Ignore("See DRILL-5387")
@Category(SecurityTest.class)
public class TestDrillSpnegoAuthenticator extends BaseTest {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
index 0a254adb9f..8a2ffeab02 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/ComplexWriterImpl.java
@@ -105,13 +105,13 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri
@Override
public void setPosition(int index){
super.setPosition(index);
- switch(mode){
- case MAP:
- mapRoot.setPosition(index);
- break;
- case LIST:
- listRoot.setPosition(index);
- break;
+ switch (mode) {
+ case MAP:
+ mapRoot.setPosition(index);
+ break;
+ case LIST:
+ listRoot.setPosition(index);
+ break;
}
}