You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/24 14:52:26 UTC
[drill] branch master updated: DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (#2485)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new fb016f4 DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (#2485)
fb016f4 is described below
commit fb016f4f07c5ace99f6b5e8dba236627538b4812
Author: Paul Rogers <pa...@users.noreply.github.com>
AuthorDate: Thu Mar 24 07:52:19 2022 -0700
DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2 (#2485)
* DRILL-8086: Convert the CSV (AKA "compliant text") reader to EVF V2
Also includes:
* DRILL-8159: Convert the HTTPD reader to use EVF V2
* Build fix
* Changes from review comments
* Fix test issue
---
.../exec/store/httpd/HttpdLogBatchReader.java | 50 ++--
.../exec/store/httpd/HttpdLogFormatPlugin.java | 35 +--
.../apache/drill/exec/store/httpd/HttpdParser.java | 4 +-
.../drill/exec/store/http/TestHttpPlugin.java | 68 ++---
.../exec/store/easy/json/JsonRecordWriter.java | 8 +-
.../exec/store/easy/text/TextFormatConfig.java | 130 +++++++++
.../exec/store/easy/text/TextFormatPlugin.java | 172 ++----------
.../easy/text/reader/CompliantTextBatchReader.java | 301 +++++++++++----------
.../easy/text/reader/ConstrainedFieldOutput.java | 7 +-
.../store/easy/text/reader/FieldVarCharOutput.java | 14 +
.../easy/text/reader/TextParsingSettings.java | 3 +-
.../exec/store/easy/text/reader/TextReader.java | 7 +-
.../exec/physical/impl/writer/TestTextWriter.java | 2 +-
.../drill/exec/server/rest/TestRestJson.java | 2 +-
.../drill/exec/store/DropboxFileSystemTest.java | 2 +-
.../drill/exec/store/TestPluginRegistry.java | 2 +-
.../store/dfs/TestFormatPluginOptionExtractor.java | 2 +-
.../store/easy/text/compliant/BaseCsvTest.java | 13 +-
.../easy/text/compliant/TestCsvWithHeaders.java | 40 ++-
.../easy/text/compliant/TestCsvWithSchema.java | 90 +++++-
.../store/easy/text/compliant/TestTextReader.java | 6 +-
.../drill/exec/store/mock/TestMockRowReader.java | 13 +-
.../drill/exec/util/StoragePluginTestUtils.java | 2 +-
23 files changed, 555 insertions(+), 418 deletions(-)
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java
index 275132a..664fd7c 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogBatchReader.java
@@ -22,8 +22,9 @@ import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
@@ -31,7 +32,7 @@ import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
-import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,36 +41,29 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
-public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class HttpdLogBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(HttpdLogBatchReader.class);
public static final String RAW_LINE_COL_NAME = "_raw";
public static final String MATCHED_COL_NAME = "_matched";
private final HttpdLogFormatConfig formatConfig;
- private final int maxRecords;
- private final EasySubScan scan;
- private HttpdParser parser;
- private FileSplit split;
+ private final HttpdParser parser;
+ private final FileDescrip file;
private InputStream fsStream;
- private RowSetLoader rowWriter;
+ private final RowSetLoader rowWriter;
private BufferedReader reader;
private int lineNumber;
- private CustomErrorContext errorContext;
- private ScalarWriter rawLineWriter;
- private ScalarWriter matchedWriter;
+ private final CustomErrorContext errorContext;
+ private final ScalarWriter rawLineWriter;
+ private final ScalarWriter matchedWriter;
private int errorCount;
-
- public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, int maxRecords, EasySubScan scan) {
+ public HttpdLogBatchReader(HttpdLogFormatConfig formatConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
this.formatConfig = formatConfig;
- this.maxRecords = maxRecords;
- this.scan = scan;
- }
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
// Open the input stream to the log file
- openFile(negotiator);
+ file = negotiator.file();
+ openFile();
errorContext = negotiator.parentErrorContext();
try {
parser = new HttpdParser(
@@ -92,7 +86,6 @@ public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator>
parser.addFieldsToParser(rowWriter);
rawLineWriter = addImplicitColumn(RAW_LINE_COL_NAME, MinorType.VARCHAR);
matchedWriter = addImplicitColumn(MATCHED_COL_NAME, MinorType.BIT);
- return true;
}
@Override
@@ -108,11 +101,6 @@ public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator>
private boolean nextLine(RowSetLoader rowWriter) {
String line;
- // Check if the limit has been reached
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
try {
line = reader.readLine();
if (line == null) {
@@ -164,19 +152,19 @@ public class HttpdLogBatchReader implements ManagedReader<FileSchemaNegotiator>
try {
fsStream.close();
} catch (IOException e) {
- logger.warn("Error when closing HTTPD file: {} {}", split.getPath().toString(), e.getMessage());
+ logger.warn("Error when closing HTTPD file: {} {}", file.split().getPath().toString(), e.getMessage());
}
fsStream = null;
}
- private void openFile(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
+ private void openFile() {
+ Path path = file.split().getPath();
try {
- fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ fsStream = file.fileSystem().openPossiblyCompressedStream(path);
} catch (Exception e) {
throw UserException
.dataReadError(e)
- .message("Failed to open open input file: %s", split.getPath().toString())
+ .message("Failed to open open input file: %s", path.toString())
.addContext(e.getMessage())
.build(logger);
}
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index ab6d80a..5c04fa3 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -21,12 +21,12 @@ package org.apache.drill.exec.store.httpd;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
@@ -41,18 +41,16 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
private static class HttpLogReaderFactory extends FileReaderFactory {
private final HttpdLogFormatConfig config;
- private final int maxRecords;
private final EasySubScan scan;
- private HttpLogReaderFactory(HttpdLogFormatConfig config, int maxRecords, EasySubScan scan) {
+ private HttpLogReaderFactory(HttpdLogFormatConfig config, EasySubScan scan) {
this.config = config;
- this.maxRecords = maxRecords;
this.scan = scan;
}
@Override
- public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new HttpdLogBatchReader(config, maxRecords, scan);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new HttpdLogBatchReader(config, scan, negotiator);
}
}
@@ -76,24 +74,15 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatConfig>
.fsConf(fsConf)
.defaultName(DEFAULT_NAME)
.readerOperatorType(OPERATOR_TYPE)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
- EasySubScan scan, OptionManager options) {
- return new HttpdLogBatchReader(formatConfig, scan.getMaxRecords(), scan);
- }
-
- @Override
- protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
- FileScanFramework.FileScanBuilder builder = new FileScanFramework.FileScanBuilder();
- builder.setReaderFactory(new HttpLogReaderFactory(formatConfig, scan.getMaxRecords(), scan));
-
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new HttpLogReaderFactory(formatConfig, scan));
}
}
+
diff --git a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
index c31c5ad..4aec836 100644
--- a/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
+++ b/contrib/format-httpd/src/main/java/org/apache/drill/exec/store/httpd/HttpdParser.java
@@ -186,9 +186,7 @@ public class HttpdParser {
dummy.addParseTarget(String.class.getMethod("indexOf", String.class), allParserPaths);
- /*
- If the column is not requested explicitly, remove it from the requested path list.
- */
+ // If the column is not requested explicitly, remove it from the requested path list.
if (!isStarQuery() &&
!isMetadataQuery() &&
!isOnlyImplicitColumns()) {
diff --git a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 1ddbb05..dd1f13d 100644
--- a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -69,12 +69,18 @@ import static org.junit.Assert.fail;
*/
public class TestHttpPlugin extends ClusterTest {
- private static final int MOCK_SERVER_PORT = 8091;
+ // Use high-numbered ports to avoid colliding with other tools on the
+ // build machine.
+ private static final int MOCK_SERVER_PORT = 44332;
private static String TEST_JSON_RESPONSE;
private static String TEST_CSV_RESPONSE;
private static String TEST_XML_RESPONSE;
private static String TEST_JSON_RESPONSE_WITH_DATATYPES;
+ public static String makeUrl(String url) {
+ return String.format(url, MOCK_SERVER_PORT);
+ }
+
@BeforeClass
public static void setup() throws Exception {
startCluster(ClusterFixture.builder(dirTestWatcher));
@@ -149,7 +155,7 @@ public class TestHttpPlugin extends ClusterTest {
// The connection acts like a schema.
// Ignores the message body except for data.
HttpApiConfig mockSchema = HttpApiConfig.builder()
- .url("http://localhost:8091/json")
+ .url(makeUrl("http://localhost:%d/json"))
.method("GET")
.headers(headers)
.authType("basic")
@@ -166,7 +172,7 @@ public class TestHttpPlugin extends ClusterTest {
// This is the preferred approach, the base URL contains as much info as possible;
// all other parameters are specified in SQL. See README for an example.
HttpApiConfig mockTable = HttpApiConfig.builder()
- .url("http://localhost:8091/json")
+ .url(makeUrl("http://localhost:%d/json"))
.method("GET")
.headers(headers)
.authType("basic")
@@ -178,14 +184,14 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockPostConfig = HttpApiConfig.builder()
- .url("http://localhost:8091/")
+ .url(makeUrl("http://localhost:%d/"))
.method("POST")
.headers(headers)
.postBody("key1=value1\nkey2=value2")
.build();
HttpApiConfig mockPostPushdownWithStaticParams = HttpApiConfig.builder()
- .url("http://localhost:8091/")
+ .url(makeUrl("http://localhost:%d/"))
.method("POST")
.headers(headers)
.requireTail(false)
@@ -195,7 +201,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockPostPushdown = HttpApiConfig.builder()
- .url("http://localhost:8091/")
+ .url(makeUrl("http://localhost:%d/"))
.method("POST")
.headers(headers)
.requireTail(false)
@@ -204,7 +210,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockJsonNullBodyPost = HttpApiConfig.builder()
- .url("http://localhost:8091/")
+ .url(makeUrl("http://localhost:%d/"))
.method("POST")
.headers(headers)
.requireTail(false)
@@ -213,7 +219,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockJsonPostConfig = HttpApiConfig.builder()
- .url("http://localhost:8091/")
+ .url(makeUrl("http://localhost:%d/"))
.method("POST")
.headers(headers)
.requireTail(false)
@@ -230,7 +236,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockJsonConfigWithPaginator = HttpApiConfig.builder()
- .url("http://localhost:8091/json")
+ .url(makeUrl("http://localhost:%d/json"))
.method("get")
.headers(headers)
.requireTail(false)
@@ -239,14 +245,14 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockPostConfigWithoutPostBody = HttpApiConfig.builder()
- .url("http://localhost:8091/")
+ .url(makeUrl("http://localhost:%d/"))
.method("POST")
.authType("basic")
.headers(headers)
.build();
HttpApiConfig mockCsvConfig = HttpApiConfig.builder()
- .url("http://localhost:8091/csv")
+ .url(makeUrl("http://localhost:%d/csv"))
.method("GET")
.headers(headers)
.authType("basic")
@@ -257,7 +263,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockCsvConfigWithPaginator = HttpApiConfig.builder()
- .url("http://localhost:8091/csv")
+ .url(makeUrl("http://localhost:%d/csv"))
.method("get")
.paginator(offsetPaginatorForJson)
.inputType("csv")
@@ -266,7 +272,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockXmlConfig = HttpApiConfig.builder()
- .url("http://localhost:8091/xml")
+ .url(makeUrl("http://localhost:%d/xml"))
.method("GET")
.headers(headers)
.authType("basic")
@@ -278,7 +284,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockGithubWithParam = HttpApiConfig.builder()
- .url("http://localhost:8091/orgs/{org}/repos")
+ .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
.method("GET")
.headers(headers)
.params(Arrays.asList("lat", "lng", "date"))
@@ -287,7 +293,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockGithubWithDuplicateParam = HttpApiConfig.builder()
- .url("http://localhost:8091/orgs/{org}/repos")
+ .url(makeUrl("http://localhost:%d/orgs/{org}/repos"))
.method("GET")
.headers(headers)
.params(Arrays.asList("org", "lng", "date"))
@@ -296,7 +302,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockGithubWithParamInQuery = HttpApiConfig.builder()
- .url("http://localhost:8091/orgs/{org}/repos?p1={p1}")
+ .url(makeUrl("http://localhost:%d/orgs/{org}/repos?p1={p1}"))
.method("GET")
.headers(headers)
.params(Arrays.asList("p2", "p3"))
@@ -305,7 +311,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
HttpApiConfig mockTableWithJsonOptions = HttpApiConfig.builder()
- .url("http://localhost:8091/json")
+ .url(makeUrl("http://localhost:%d/json"))
.method("GET")
.headers(headers)
.requireTail(false)
@@ -554,7 +560,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("http://localhost:8091/orgs/apache/repos")
+ .addRow(makeUrl("http://localhost:%d/orgs/apache/repos"))
.build();
RowSetUtilities.verify(expected, results);
@@ -580,7 +586,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("http://localhost:8091/orgs/true/repos")
+ .addRow(makeUrl("http://localhost:%d/orgs/true/repos"))
.build();
RowSetUtilities.verify(expected, results);
@@ -606,7 +612,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("http://localhost:8091/orgs/1234/repos")
+ .addRow(makeUrl("http://localhost:%d/orgs/1234/repos"))
.build();
RowSetUtilities.verify(expected, results);
@@ -657,7 +663,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("http://localhost:8091/orgs/apache/repos?org=apache")
+ .addRow(makeUrl("http://localhost:%d/orgs/apache/repos?org=apache"))
.build();
RowSetUtilities.verify(expected, results);
@@ -683,7 +689,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("http://localhost:8091/orgs/apache/repos?p1=param1&p2=param2")
+ .addRow(makeUrl("http://localhost:%d/orgs/apache/repos?p1=param1&p2=param2"))
.build();
RowSetUtilities.verify(expected, results);
@@ -845,7 +851,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02")
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02"))
.build();
RowSetUtilities.verify(expected, results);
@@ -868,8 +874,8 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/csvcsv?arg1=4")
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/csvcsv?arg1=4")
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/csvcsv?arg1=4"))
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/csvcsv?arg1=4"))
.build();
RowSetUtilities.verify(expected, results);
@@ -892,11 +898,11 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
- .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
+ .addRow(200, "OK", "http/1.1", makeUrl("http://localhost:%d/xml?arg1=4"))
.build();
RowSetUtilities.verify(expected, results);
@@ -1227,7 +1233,7 @@ public class TestHttpPlugin extends ClusterTest {
.build();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(404, "Client Error", "http/1.1", "http://localhost:8091/json")
+ .addRow(404, "Client Error", "http/1.1", makeUrl("http://localhost:%d/json"))
.build();
RowSetUtilities.verify(expected, results);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
index b73fa76..160ad8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java
@@ -36,6 +36,8 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
@@ -43,7 +45,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWriter {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonRecordWriter.class);
+ private static final Logger logger = LoggerFactory.getLogger(JsonRecordWriter.class);
private static final String LINE_FEED = String.format("%n");
private Path cleanUpLocation;
@@ -54,8 +56,8 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr
private boolean useExtendedOutput;
private int index;
- private FileSystem fs = null;
- private OutputStream stream = null;
+ private FileSystem fs;
+ private OutputStream stream;
private final JsonFactory factory = new JsonFactory();
private final StorageStrategy storageStrategy;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatConfig.java
new file mode 100644
index 0000000..01ce072
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatConfig.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.text;
+
+import java.util.List;
+import java.util.Objects;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
+
+@JsonTypeName(TextFormatPlugin.PLUGIN_NAME)
+@JsonInclude(Include.NON_DEFAULT)
+public class TextFormatConfig implements FormatPluginConfig {
+
+ public final List<String> extensions;
+ public final String lineDelimiter;
+ public final char fieldDelimiter;
+ public final char quote;
+ public final char escape;
+ public final char comment;
+ public final boolean skipFirstLine;
+ public final boolean extractHeader;
+
+ @JsonCreator
+ public TextFormatConfig(
+ @JsonProperty("extensions") List<String> extensions,
+ @JsonProperty("lineDelimiter") String lineDelimiter,
+ // Drill 1.17 and before used "delimiter" in the
+ // bootstrap storage plugins file, assume many instances
+ // exist in the field.
+ @JsonAlias("delimiter")
+ @JsonProperty("fieldDelimiter") String fieldDelimiter,
+ @JsonProperty("quote") String quote,
+ @JsonProperty("escape") String escape,
+ @JsonProperty("comment") String comment,
+ @JsonProperty("skipFirstLine") Boolean skipFirstLine,
+ @JsonProperty("extractHeader") Boolean extractHeader) {
+ this.extensions = extensions == null ?
+ ImmutableList.of() : ImmutableList.copyOf(extensions);
+ this.lineDelimiter = Strings.isNullOrEmpty(lineDelimiter) ? "\n" : lineDelimiter;
+ this.fieldDelimiter = Strings.isNullOrEmpty(fieldDelimiter) ? ',' : fieldDelimiter.charAt(0);
+ this.quote = Strings.isNullOrEmpty(quote) ? '"' : quote.charAt(0);
+ this.escape = Strings.isNullOrEmpty(escape) ? '"' : escape.charAt(0);
+ this.comment = Strings.isNullOrEmpty(comment) ? '#' : comment.charAt(0);
+ this.skipFirstLine = skipFirstLine == null ? false : skipFirstLine;
+ this.extractHeader = extractHeader == null ? false : extractHeader;
+ }
+
+ public List<String> getExtensions() { return extensions; }
+ public String getLineDelimiter() { return lineDelimiter; }
+ public char getFieldDelimiter() { return fieldDelimiter; }
+ public char getQuote() { return quote; }
+ public char getEscape() { return escape; }
+ public char getComment() { return comment; }
+ public boolean isSkipFirstLine() { return skipFirstLine; }
+ @JsonProperty("extractHeader")
+ public boolean isHeaderExtractionEnabled() { return extractHeader; }
+
+ /**
+ * Used for JSON serialization to handle \u0000 value which
+ * Jackson converts to a null string.
+ */
+ @JsonProperty("fieldDelimiter")
+ public String firldDeliterString() {
+ return Character.toString(fieldDelimiter);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(extensions, lineDelimiter, fieldDelimiter,
+ quote, escape, comment, skipFirstLine, extractHeader);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ TextFormatConfig other = (TextFormatConfig) obj;
+ return Objects.equals(extensions, other.extensions) &&
+ Objects.equals(lineDelimiter, other.lineDelimiter) &&
+ Objects.equals(fieldDelimiter, other.fieldDelimiter) &&
+ Objects.equals(quote, other.quote) &&
+ Objects.equals(escape, other.escape) &&
+ Objects.equals(comment, other.comment) &&
+ Objects.equals(skipFirstLine, other.skipFirstLine) &&
+ Objects.equals(extractHeader, other.extractHeader);
+ }
+
+ @Override
+ public String toString() {
+ return new PlanStringBuilder(this)
+ .field("extensions", extensions)
+ .field("skipFirstLine", skipFirstLine)
+ .field("extractHeader", extractHeader)
+ .escapedField("fieldDelimiter", fieldDelimiter)
+ .escapedField("lineDelimiter", lineDelimiter)
+ .escapedField("quote", quote)
+ .escapedField("escape", escape)
+ .escapedField("comment", comment)
+ .toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index a8b5736..ce8120c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -17,19 +17,9 @@
*/
package org.apache.drill.exec.store.easy.text;
-import com.fasterxml.jackson.annotation.JsonAlias;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonInclude.Include;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-
-import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.ChildErrorContext;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
@@ -39,11 +29,11 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework.ColumnsScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.metadata.Propertied;
@@ -59,15 +49,12 @@ import org.apache.drill.exec.store.easy.text.reader.CompliantTextBatchReader;
import org.apache.drill.exec.store.easy.text.reader.TextParsingSettings;
import org.apache.drill.exec.store.easy.text.writer.TextRecordWriter;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
-import org.apache.drill.shaded.guava.com.google.common.base.Strings;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
/**
* Text format plugin for CSV and other delimited text formats.
@@ -81,9 +68,8 @@ import java.util.Objects;
* to allow tight control of the size of produced batches (as well
* as to support provided schema.)
*/
-public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextFormatConfig> {
-
- private final static String PLUGIN_NAME = "text";
+public class TextFormatPlugin extends EasyFormatPlugin<TextFormatConfig> {
+ final static String PLUGIN_NAME = "text";
public static final int MAXIMUM_NUMBER_COLUMNS = 64 * 1024;
@@ -111,121 +97,19 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
public static final String WRITER_OPERATOR_TYPE = "TEXT_WRITER";
- @JsonTypeName(PLUGIN_NAME)
- @JsonInclude(Include.NON_DEFAULT)
- public static class TextFormatConfig implements FormatPluginConfig {
-
- public final List<String> extensions;
- public final String lineDelimiter;
- public final char fieldDelimiter;
- public final char quote;
- public final char escape;
- public final char comment;
- public final boolean skipFirstLine;
- public final boolean extractHeader;
-
- @JsonCreator
- public TextFormatConfig(
- @JsonProperty("extensions") List<String> extensions,
- @JsonProperty("lineDelimiter") String lineDelimiter,
- // Drill 1.17 and before used "delimiter" in the
- // bootstrap storage plugins file, assume many instances
- // exist in the field.
- @JsonAlias("delimiter")
- @JsonProperty("fieldDelimiter") String fieldDelimiter,
- @JsonProperty("quote") String quote,
- @JsonProperty("escape") String escape,
- @JsonProperty("comment") String comment,
- @JsonProperty("skipFirstLine") Boolean skipFirstLine,
- @JsonProperty("extractHeader") Boolean extractHeader) {
- this.extensions = extensions == null ?
- ImmutableList.of() : ImmutableList.copyOf(extensions);
- this.lineDelimiter = lineDelimiter == null ? "\n" : lineDelimiter;
- this.fieldDelimiter = Strings.isNullOrEmpty(fieldDelimiter) ? ',' : fieldDelimiter.charAt(0);
- this.quote = Strings.isNullOrEmpty(quote) ? '"' : quote.charAt(0);
- this.escape = Strings.isNullOrEmpty(escape) ? '"' : escape.charAt(0);
- this.comment = Strings.isNullOrEmpty(comment) ? '#' : comment.charAt(0);
- this.skipFirstLine = skipFirstLine != null && skipFirstLine;
- this.extractHeader = extractHeader != null && extractHeader;
- }
-
- public TextFormatConfig() {
- this(null, null, null, null, null, null, null, null);
- }
-
- public List<String> getExtensions() { return extensions; }
- public String getLineDelimiter() { return lineDelimiter; }
- public char getFieldDelimiter() { return fieldDelimiter; }
- public char getQuote() { return quote; }
- public char getEscape() { return escape; }
- public char getComment() { return comment; }
- public boolean isSkipFirstLine() { return skipFirstLine; }
- @JsonProperty("extractHeader")
- public boolean isHeaderExtractionEnabled() { return extractHeader; }
-
- @Override
- public int hashCode() {
- return Objects.hash(extensions, lineDelimiter, fieldDelimiter,
- quote, escape, comment, skipFirstLine, extractHeader);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- TextFormatConfig other = (TextFormatConfig) obj;
- return Objects.equals(extensions, other.extensions) &&
- Objects.equals(lineDelimiter, other.lineDelimiter) &&
- Objects.equals(fieldDelimiter, other.fieldDelimiter) &&
- Objects.equals(quote, other.quote) &&
- Objects.equals(escape, other.escape) &&
- Objects.equals(comment, other.comment) &&
- Objects.equals(skipFirstLine, other.skipFirstLine) &&
- Objects.equals(extractHeader, other.extractHeader);
- }
-
- @Override
- public String toString() {
- return new PlanStringBuilder(this)
- .field("extensions", extensions)
- .field("skipFirstLine", skipFirstLine)
- .field("extractHeader", extractHeader)
- .escapedField("fieldDelimiter", fieldDelimiter)
- .escapedField("lineDelimiter", lineDelimiter)
- .escapedField("quote", quote)
- .escapedField("escape", escape)
- .escapedField("comment", comment)
- .toString();
- }
- }
-
- /**
- * Builds the readers for the V3 text scan operator.
- */
- private static class ColumnsReaderFactory extends FileReaderFactory {
-
+ private static class TextReaderFactory extends FileReaderFactory {
private final TextParsingSettings settings;
- private final int maxRecords;
- public ColumnsReaderFactory(TextParsingSettings settings, int maxRecords) {
+ public TextReaderFactory(TextParsingSettings settings) {
this.settings = settings;
- this.maxRecords = maxRecords;
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new CompliantTextBatchReader(settings, maxRecords);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new CompliantTextBatchReader(negotiator, settings);
}
}
- public TextFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig storageConfig) {
- this(name, context, fsConf, storageConfig, new TextFormatConfig());
- }
-
public TextFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig config,
TextFormatConfig formatPluginConfig) {
@@ -243,7 +127,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
.fsConf(fsConf)
.defaultName(PLUGIN_NAME)
.writerOperatorType(WRITER_OPERATOR_TYPE)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.build();
}
@@ -267,19 +151,10 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
}
@Override
- protected FileScanBuilder frameworkBuilder(
- OptionManager options, EasySubScan scan) throws ExecutionSetupException {
- ColumnsScanBuilder builder = new ColumnsScanBuilder();
- initScanBuilder(builder, scan);
-
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
TextParsingSettings settings =
new TextParsingSettings(getConfig(), scan.getSchema());
- builder.setReaderFactory(new ColumnsReaderFactory(settings, scan.getMaxRecords()));
-
- // If this format has no headers, or wants to skip them,
- // then we must use the columns column to hold the data.
- builder.requireColumnsArray(
- ! settings.isHeaderExtractionEnabled() && builder.providedSchema() == null);
+ builder.readerFactory(new TextReaderFactory(settings));
// Text files handle nulls in an unusual way. Missing columns
// are set to required Varchar and filled with blanks. Yes, this
@@ -288,23 +163,18 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
// files have been defined within Drill.
builder.nullType(Types.required(MinorType.VARCHAR));
- // The text readers use required Varchar columns to represent null columns.
- builder.allowRequiredNullColumns(true);
-
// Provide custom error context
builder.errorContext(
new ChildErrorContext(builder.errorContext()) {
@Override
public void addContext(UserException.Builder builder) {
super.addContext(builder);
- builder.addContext("Extract headers:",
+ builder.addContext("Extract headers",
Boolean.toString(getConfig().isHeaderExtractionEnabled()));
- builder.addContext("Skip first line:",
+ builder.addContext("Skip first line",
Boolean.toString(getConfig().isSkipFirstLine()));
}
});
-
- return builder;
}
@Override
@@ -342,14 +212,8 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
for (final CompleteFileWork work : scan.getWorkIterable()) {
data += work.getTotalBytes();
}
-
final double estimatedRowSize = settings.getOptions().getOption(ExecConstants.TEXT_ESTIMATED_ROW_SIZE);
-
- if (scan.supportsLimitPushdown() && scan.getLimit() > 0) {
- return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, scan.getLimit(), 1, data);
- } else {
- final double estRowCount = data / estimatedRowSize;
- return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
- }
+ final double estRowCount = data / estimatedRowSize;
+ return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, (long) estRowCount, 1, data);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
index 4c0c698..934b306 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/CompliantTextBatchReader.java
@@ -17,60 +17,66 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
-import com.univocity.parsers.common.TextParsingException;
-import io.netty.buffer.DrillBuf;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsScanFramework;
-import org.apache.drill.exec.physical.impl.scan.columns.ColumnsSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.convert.StandardConversions;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.v3.schema.ProjectedColumn;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
import org.apache.drill.exec.vector.accessor.ValueWriter;
import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InputStream;
+import com.univocity.parsers.common.TextParsingException;
+
+import io.netty.buffer.DrillBuf;
/**
* Text reader, Complies with the RFC 4180 standard for text/csv files.
*/
-public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNegotiator> {
+public class CompliantTextBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(CompliantTextBatchReader.class);
+ public static final String COLUMNS_COL = "columns";
private static final int MAX_RECORDS_PER_BATCH = 8096;
private static final int READ_BUFFER = 1024 * 1024;
private static final int WHITE_SPACE_BUFFER = 64 * 1024;
// settings to be used while parsing
private final TextParsingSettings settings;
- // Chunk of the file to be read by this reader
- private FileSplit split;
- // Limit pushed down from the query
- private final int maxRecords;
+ private final CustomErrorContext errorContext;
// text reader implementation
- private TextReader reader;
+ private final TextReader reader;
// input buffer
- private DrillBuf readBuffer;
+ private final DrillBuf readBuffer;
// working buffer to handle whitespace
- private DrillBuf whitespaceBuffer;
- private DrillFileSystem dfs;
+ private final DrillBuf whitespaceBuffer;
private RowSetLoader writer;
- public CompliantTextBatchReader(TextParsingSettings settings, int maxRecords) {
+ /**
+ * Create and open the text reader.
+ * @throws EarlyEofException
+ */
+ public CompliantTextBatchReader(FileSchemaNegotiator schemaNegotiator,
+ TextParsingSettings settings) throws EarlyEofException {
this.settings = settings;
- this.maxRecords = maxRecords;
+ this.errorContext = schemaNegotiator.parentErrorContext();
// Validate. Otherwise, these problems show up later as a data
// read error which is very confusing.
@@ -78,23 +84,11 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
throw UserException
.validationError()
.message("The text format line delimiter cannot be blank.")
+ .addContext(errorContext)
.build(logger);
}
- }
- /**
- * Performs the initial setup required for the record reader.
- * Initializes the input stream, handling of the output record batch
- * and the actual reader to be used.
- *
- * @param schemaNegotiator Used to create the schema in the output record batch
- * @return true if opens successfully, false if output is null
- */
- @Override
- public boolean open(ColumnsSchemaNegotiator schemaNegotiator) {
final OperatorContext context = schemaNegotiator.context();
- dfs = schemaNegotiator.fileSystem();
- split = schemaNegotiator.split();
// Note: DO NOT use managed buffers here. They remain in existence
// until the fragment is shut down. The buffers here are large.
@@ -102,9 +96,8 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
// holding onto 1 GB of memory in managed buffers.
// Instead, we allocate the buffers explicitly, and must free
// them.
-
- readBuffer = context.getAllocator().buffer(READ_BUFFER);
- whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
+ this.readBuffer = context.getAllocator().buffer(READ_BUFFER);
+ this.whitespaceBuffer = context.getAllocator().buffer(WHITE_SPACE_BUFFER);
schemaNegotiator.batchSize(MAX_RECORDS_PER_BATCH);
// setup Output, Input, and Reader
@@ -116,12 +109,14 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
output = openWithoutHeaders(schemaNegotiator);
}
if (output == null) {
- return false;
+ throw new EarlyEofException();
}
- openReader(output);
- return true;
+ this.reader = openReader(schemaNegotiator, output);
} catch (final IOException e) {
- throw UserException.dataReadError(e).addContext("File Path", split.getPath().toString()).build(logger);
+ throw UserException.dataReadError(e)
+ .addContext("File open failed")
+ .addContext(errorContext)
+ .build(logger);
}
}
@@ -129,17 +124,14 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
* Extract header and use that to define the reader schema.
*
* @param schemaNegotiator used to define the reader schema
- * provided schema, if any. Used when using a provided schema
- * with a text file that contains no headers; ignored for
- * text file with headers
- * @return text output
*/
- private TextOutput openWithHeaders(ColumnsSchemaNegotiator schemaNegotiator) throws IOException {
- final String [] fieldNames = extractHeader();
+ private TextOutput openWithHeaders(FileSchemaNegotiator schemaNegotiator) throws IOException {
+ validateNoColumnsProjection(schemaNegotiator);
+ final String [] fieldNames = extractHeader(schemaNegotiator);
if (fieldNames == null) {
return null;
}
- if (schemaNegotiator.hasProvidedSchema()) {
+ if (schemaNegotiator.providedSchema() != null) {
return buildWithSchema(schemaNegotiator, fieldNames);
} else {
return buildFromColumnHeaders(schemaNegotiator, fieldNames);
@@ -150,30 +142,25 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
* File has headers and a provided schema is provided. Convert from VARCHAR
* input type to the provided output type, but only if the column is projected.
*/
- private FieldVarCharOutput buildWithSchema(ColumnsSchemaNegotiator schemaNegotiator,
+ private FieldVarCharOutput buildWithSchema(FileSchemaNegotiator schemaNegotiator,
String[] fieldNames) {
- TupleMetadata readerSchema = mergeSchemas(schemaNegotiator.providedSchema(), fieldNames);
- schemaNegotiator.tableSchema(readerSchema, true);
- writer = schemaNegotiator.build().writer();
- StandardConversions conversions = conversions(schemaNegotiator.providedSchema());
- ValueWriter[] colWriters = new ValueWriter[fieldNames.length];
- for (int i = 0; i < fieldNames.length; i++) {
- ScalarWriter colWriter = writer.scalar(fieldNames[i]);
- if (writer.isProjected()) {
- colWriters[i] = conversions.converterFor(colWriter, MinorType.VARCHAR);
- } else {
- colWriters[i] = colWriter;
- }
- }
- return new FieldVarCharOutput(writer, colWriters);
+
+ TupleMetadata readerSchema = buildSchemaFromHeaders(fieldNames);
+
+ // Build converting column writers
+ FixedReceiver.Builder builder = FixedReceiver.builderFor(schemaNegotiator)
+ .schemaIsComplete();
+ builder.conversionBuilder().blankAs(ColumnMetadata.BLANK_AS_NULL);
+ FixedReceiver receiver = builder.build(readerSchema);
+ writer = receiver.rowWriter();
+ return new FieldVarCharOutput(receiver);
}
- private TupleMetadata mergeSchemas(TupleMetadata providedSchema,
- String[] fieldNames) {
- final TupleMetadata readerSchema = new TupleSchema();
- for (String fieldName : fieldNames) {
- final ColumnMetadata providedCol = providedSchema.metadata(fieldName);
- readerSchema.addColumn(providedCol == null ? textColumn(fieldName) : providedCol);
+ private TupleMetadata buildSchemaFromHeaders(String[] fieldNames) {
+ // Build table schema from headers
+ TupleMetadata readerSchema = new TupleSchema();
+ for (String name : fieldNames) {
+ readerSchema.addColumn(textColumn(name));
}
return readerSchema;
}
@@ -186,13 +173,10 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
* File has column headers. No provided schema. Build schema from the
* column headers.
*/
- private FieldVarCharOutput buildFromColumnHeaders(ColumnsSchemaNegotiator schemaNegotiator,
+ private FieldVarCharOutput buildFromColumnHeaders(FileSchemaNegotiator schemaNegotiator,
String[] fieldNames) {
- final TupleMetadata schema = new TupleSchema();
- for (final String colName : fieldNames) {
- schema.addColumn(textColumn(colName));
- }
- schemaNegotiator.tableSchema(schema, true);
+ TupleMetadata readerSchema = buildSchemaFromHeaders(fieldNames);
+ schemaNegotiator.tableSchema(readerSchema, true);
writer = schemaNegotiator.build().writer();
ValueWriter[] colWriters = new ValueWriter[fieldNames.length];
for (int i = 0; i < fieldNames.length; i++) {
@@ -205,53 +189,112 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
* When no headers, create a single array column "columns".
*/
private TextOutput openWithoutHeaders(
- ColumnsSchemaNegotiator schemaNegotiator) {
- if (schemaNegotiator.hasProvidedSchema()) {
+ FileSchemaNegotiator schemaNegotiator) {
+ // Treat a property-only schema as no schema
+ TupleMetadata providedSchema = schemaNegotiator.providedSchema();
+ if (providedSchema != null && providedSchema.size() > 0) {
return buildWithSchema(schemaNegotiator);
} else {
return buildColumnsArray(schemaNegotiator);
}
}
- private FieldVarCharOutput buildWithSchema(ColumnsSchemaNegotiator schemaNegotiator) {
- TupleMetadata providedSchema = schemaNegotiator.providedSchema();
- schemaNegotiator.tableSchema(providedSchema, true);
- writer = schemaNegotiator.build().writer();
- StandardConversions conversions = conversions(providedSchema);
- ValueWriter[] colWriters = new ValueWriter[providedSchema.size()];
- for (int i = 0; i < colWriters.length; i++) {
- colWriters[i] = conversions.converterFor(
- writer.scalar(providedSchema.metadata(i).name()), MinorType.VARCHAR);
+ private FieldVarCharOutput buildWithSchema(FileSchemaNegotiator schemaNegotiator) {
+ validateNoColumnsProjection(schemaNegotiator);
+
+ // Build table schema from provided
+ TupleMetadata readerSchema = new TupleSchema();
+ for (ColumnMetadata providedCol : schemaNegotiator.providedSchema()) {
+ readerSchema.addColumn(textColumn(providedCol.name()));
+ }
+
+ // Build converting column writers
+ FixedReceiver.Builder builder = FixedReceiver.builderFor(schemaNegotiator)
+ .schemaIsComplete();
+ builder.conversionBuilder().blankAs(ColumnMetadata.BLANK_AS_NULL);
+ FixedReceiver receiver = builder.build(readerSchema);
+
+ // Convert to format for this reader
+ writer = receiver.rowWriter();
+ return new ConstrainedFieldOutput(receiver);
+ }
+
+ private void validateNoColumnsProjection(FileSchemaNegotiator schemaNegotiator) {
+ // If we do not require the columns array, then we presume that
+ // the reader does not provide arrays, so any use of the columns[x]
+ // column is likely an error. We rely on the plugin's own error
+ // context to fill in information that would explain the issue
+ // in the context of that plugin.
+
+ ProjectedColumn colProj = schemaNegotiator.projectionFor(COLUMNS_COL);
+ if (colProj != null && colProj.isArray()) {
+ throw UserException
+ .validationError()
+ .message("Unexpected `columns`[x]; file has headers or schema")
+ .addContext(errorContext)
+ .build(logger);
}
- return new ConstrainedFieldOutput(writer, colWriters);
}
private TextOutput buildColumnsArray(
- ColumnsSchemaNegotiator schemaNegotiator) {
- schemaNegotiator.tableSchema(ColumnsScanFramework.columnsSchema(), true);
+ FileSchemaNegotiator schemaNegotiator) {
+ ProjectedColumn colProj = schemaNegotiator.projectionFor(COLUMNS_COL);
+ validateColumnsProjection(colProj);
+ schemaNegotiator.tableSchema(columnsSchema(), true);
writer = schemaNegotiator.build().writer();
- return new RepeatedVarCharOutput(writer, schemaNegotiator.projectedIndexes());
+ return new RepeatedVarCharOutput(writer,
+ colProj == null ? null : colProj.indexes());
+ }
+
+ private void validateColumnsProjection(ProjectedColumn colProj) {
+ if (colProj == null) {
+ return;
+ }
+
+ // The columns column cannot be a map. That is, the following is
+ // not allowed: columns.foo.
+
+ if (colProj.isMap()) {
+ throw UserException
+ .validationError()
+ .message("Column `%s` has map elements, but must be an array", colProj.name())
+ .addContext(errorContext)
+ .build(logger);
+ }
+
+ if (colProj.isArray()) {
+ int maxIndex = colProj.maxIndex();
+ if (maxIndex > TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS) {
+ throw UserException
+ .validationError()
+ .message("`columns`[%d] index out of bounds, max supported size is %d",
+ maxIndex, TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
+ .addContext("Column:", colProj.name())
+ .addContext("Maximum index:", TextFormatPlugin.MAXIMUM_NUMBER_COLUMNS)
+ .addContext("Actual index:", maxIndex)
+ .addContext(errorContext)
+ .build(logger);
+ }
+ }
}
- private void openReader(TextOutput output) throws IOException {
+ private TextReader openReader(FileSchemaNegotiator schemaNegotiator, TextOutput output) throws IOException {
+ FileSplit split = schemaNegotiator.file().split();
logger.trace("Opening file {}", split.getPath());
- final InputStream stream = dfs.openPossiblyCompressedStream(split.getPath());
+ final InputStream stream = schemaNegotiator.file().open();
final TextInput input = new TextInput(settings, stream, readBuffer,
split.getStart(), split.getStart() + split.getLength());
// setup Reader using Input and Output
- reader = new TextReader(settings, input, output, whitespaceBuffer);
+ TextReader reader = new TextReader(settings, input, output, whitespaceBuffer);
reader.start();
+ return reader;
}
- private StandardConversions conversions(TupleMetadata providedSchema) {
-
- // CSV maps blank columns to nulls (for nullable non-string columns),
- // or to the default value (for non-nullable non-string columns.)
- return StandardConversions.builder()
- .withSchema(providedSchema)
- .blankAs(ColumnMetadata.BLANK_AS_NULL)
- .build();
+ public static TupleMetadata columnsSchema() {
+ return new SchemaBuilder()
+ .addArray(COLUMNS_COL, MinorType.VARCHAR)
+ .buildSchema();
}
/**
@@ -260,31 +303,31 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
* TODO: enhance to support more common header patterns
* @return field name strings
*/
- private String[] extractHeader() throws IOException {
+ private String[] extractHeader(FileSchemaNegotiator schemaNegotiator) throws IOException {
assert settings.isHeaderExtractionEnabled();
// don't skip header in case skipFirstLine is set true
settings.setSkipFirstLine(false);
+ FileSplit split = schemaNegotiator.file().split();
+ logger.trace("Opening file {}", split.getPath());
+ final InputStream hStream = schemaNegotiator.file().open();
final HeaderBuilder hOutput = new HeaderBuilder(split.getPath());
- // setup Input using InputStream
// we should read file header irrespective of split given given to this reader
- final InputStream hStream = dfs.openPossiblyCompressedStream(split.getPath());
final TextInput hInput = new TextInput(settings, hStream, readBuffer, 0, split.getLength());
- // setup Reader using Input and Output
- this.reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer);
- reader.start();
+ final String [] fieldNames;
+ try (TextReader reader = new TextReader(settings, hInput, hOutput, whitespaceBuffer)) {
+ reader.start();
- // extract first row only
- reader.parseNext();
+ // extract first row only
+ reader.parseNext();
- // grab the field names from output
- final String [] fieldNames = hOutput.getHeaders();
+ // grab the field names from output
+ fieldNames = hOutput.getHeaders();
+ }
- // cleanup and set to skip the first line next time we read input
- reader.close();
settings.setSkipFirstLine(true);
readBuffer.clear();
@@ -300,14 +343,9 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
public boolean next() {
reader.resetForNextBatch();
- // If the limit is defined and the row count is greater than the limit, stop reading the file.
- if (maxRecords > 0 && writer.rowCount() > maxRecords) {
- return false;
- }
-
try {
boolean more = false;
- while (! writer.isFull()) {
+ while (!writer.isFull()) {
more = reader.parseNext();
if (! more) {
break;
@@ -324,8 +362,9 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
throw (UserException) e.getCause();
}
throw UserException.dataReadError(e)
- .addContext("Failure while reading file %s. Happened at or shortly before byte position %d.",
- split.getPath(), reader.getPos())
+ .addContext("Failure while reading file")
+ .addContext("Happened at or shortly before byte position", reader.getPos())
+ .addContext(errorContext)
.build(logger);
}
}
@@ -339,22 +378,8 @@ public class CompliantTextBatchReader implements ManagedReader<ColumnsSchemaNego
// Release the buffers allocated above. Double-check to handle
// unexpected multiple calls to close().
-
- if (readBuffer != null) {
- readBuffer.release();
- readBuffer = null;
- }
- if (whitespaceBuffer != null) {
- whitespaceBuffer.release();
- whitespaceBuffer = null;
- }
- try {
- if (reader != null) {
- reader.close();
- reader = null;
- }
- } catch (final IOException e) {
- logger.warn("Exception while closing stream.", e);
- }
+ readBuffer.release();
+ whitespaceBuffer.release();
+ AutoCloseables.closeSilently(reader);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java
index 5078a57..56b6e7c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/ConstrainedFieldOutput.java
@@ -17,8 +17,7 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
-import org.apache.drill.exec.physical.resultSet.RowSetLoader;
-import org.apache.drill.exec.vector.accessor.ValueWriter;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
/**
* For CSV files without headers, but with a provided schema,
@@ -27,8 +26,8 @@ import org.apache.drill.exec.vector.accessor.ValueWriter;
*/
public class ConstrainedFieldOutput extends FieldVarCharOutput {
- ConstrainedFieldOutput(RowSetLoader writer, ValueWriter[] colWriters) {
- super(writer, colWriters);
+ ConstrainedFieldOutput(FixedReceiver receiver) {
+ super(receiver);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
index f8e44f7..f3aa08f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/FieldVarCharOutput.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.easy.text.reader;
+import org.apache.drill.exec.physical.impl.scan.v3.FixedReceiver;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
@@ -45,6 +46,19 @@ class FieldVarCharOutput extends BaseFieldOutput {
this.colWriters = colWriters;
}
+ FieldVarCharOutput(FixedReceiver receiver) {
+ this(receiver.rowWriter(), makeWriters(receiver));
+ }
+
+ private static ValueWriter[] makeWriters(FixedReceiver receiver) {
+ final TupleMetadata schema = receiver.rowWriter().tupleSchema();
+ final ValueWriter[] colWriters = new ValueWriter[schema.size()];
+ for (int i = 0; i < schema.size(); i++) {
+ colWriters[i] = receiver.scalar(i);
+ }
+ return colWriters;
+ }
+
private static boolean[] makeMask(RowSetLoader writer) {
final TupleMetadata schema = writer.tupleSchema();
final boolean[] projectionMask = new boolean[schema.size()];
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
index 2028d24..bf0e267 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextParsingSettings.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.store.easy.text.reader;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.easy.text.TextFormatPlugin;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
public class TextParsingSettings {
@@ -40,7 +40,6 @@ public class TextParsingSettings {
private final boolean parseUnescapedQuotes;
private final boolean ignoreLeadingWhitespace;
private final boolean ignoreTrailingWhitespace;
-
/**
* Configure the properties for this one scan based on:
* <p>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
index e3a5edd..c67c681 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/reader/TextReader.java
@@ -31,10 +31,10 @@ import java.io.IOException;
******************************************************************************/
/**
- * A byte-based Text parser implementation. Builds heavily upon the uniVocity parsers. Customized for UTF8 parsing and
- * DrillBuf support.
+ * A byte-based Text parser implementation. Builds heavily upon the uniVocity
+ * parsers. Customized for UTF8 parsing and DrillBuf support.
*/
-public final class TextReader {
+public final class TextReader implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(TextReader.class);
@@ -515,6 +515,7 @@ public final class TextReader {
* current record reader to clean up state.
* @throws IOException for input file read errors
*/
+ @Override
public void close() throws IOException {
input.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
index 58bdb73..7cccee6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestTextWriter.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.writer;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
@@ -39,7 +40,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
import static org.junit.Assert.assertEquals;
public class TestTextWriter extends ClusterTest {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
index 14a3f43..b4d25a0 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
@@ -30,7 +30,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
index 317c0c7..b48f29f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/DropboxFileSystemTest.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import org.apache.drill.exec.store.easy.json.JSONFormatPlugin.JSONFormatConfig;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
import org.apache.drill.test.rowSet.RowSetComparison;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
index 16553e4..ab315c6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestPluginRegistry.java
@@ -42,7 +42,7 @@ import org.apache.drill.exec.store.StoragePluginRegistry.PluginException;
import org.apache.drill.exec.store.StoragePluginRegistry.PluginFilter;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.common.logical.security.PlainCredentialsProvider;
import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.test.BaseTest;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 88a7b94..1a9175b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.RunTimeScan;
import org.apache.drill.common.scanner.persistence.ScanResult;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.test.BaseTest;
import org.junit.Test;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
index ff45968..6919481 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/BaseCsvTest.java
@@ -23,7 +23,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterTest;
@@ -148,6 +148,17 @@ public class BaseCsvTest extends ClusterTest {
return fileName;
}
+ protected String buildBiggishFile() throws IOException {
+ String fileName = "biggish.csv";
+ try(PrintWriter out = new PrintWriter(new FileWriter(new File(testDir, fileName)))) {
+ out.println("id");
+ for (int i = 0; i < 100; i++) {
+ out.println(i + 1);
+ }
+ }
+ return fileName;
+ }
+
protected static final String FILE_N_NAME = "file%d.csv";
protected static String buildTable(String tableName, String[]...fileContents) throws IOException {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
index a0bfbbf..ec90618 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithHeaders.java
@@ -115,7 +115,7 @@ public class TestCsvWithHeaders extends BaseCsvTest {
* <p>
* Prior research revealed that most DB engines can handle a null
* empty result set: no schema, no rows. For example:
- * <br><tt>SELECT * FROM VALUES ();</tt><br>
+ * <br>{@code SELECT * FROM VALUES ()}<br>
* The implementation tested here follows that pattern.
*
* @see TestCsvWithoutHeaders#testEmptyFile()
@@ -127,7 +127,6 @@ public class TestCsvWithHeaders extends BaseCsvTest {
assertNull(rowSet);
// Try again with COUNT(*)
-
long count = client.queryBuilder().sql(COUNT_STAR, EMPTY_FILE).singletonLong();
assertEquals(0, count);
}
@@ -343,7 +342,7 @@ public class TestCsvWithHeaders extends BaseCsvTest {
}
/**
- * V3 allows the use of partition columns, even for a non-partitioned file.
+ * Drill allows the use of partition columns, even for a non-partitioned file.
* The columns are null of type Nullable VARCHAR. This is area of Drill
* is a bit murky: it seems reasonable to support partition columns consistently
* rather than conditionally based on the structure of the input.
@@ -603,8 +602,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
client.queryBuilder().sql(sql, COLUMNS_FILE_NAME).run();
} catch (UserRemoteException e) {
assertTrue(e.getMessage().contains(
- "VALIDATION ERROR: Unexpected `columns`[x]; columns array not enabled"));
- assertTrue(e.getMessage().contains("Format plugin: text"));
+ "VALIDATION ERROR: Unexpected `columns`[x]; file has headers or schema"));
+ assertTrue(e.getMessage().contains("Format plugin type: text"));
assertTrue(e.getMessage().contains("Plugin config name: csv"));
assertTrue(e.getMessage().contains("Extract headers: true"));
assertTrue(e.getMessage().contains("Skip first line: false"));
@@ -639,8 +638,8 @@ public class TestCsvWithHeaders extends BaseCsvTest {
// Note: this error is caught before reading any tables,
// so no table information is available.
assertTrue(e.getMessage().contains(
- "VALIDATION ERROR: Unexpected `columns`[x]; columns array not enabled"));
- assertTrue(e.getMessage().contains("Format plugin: text"));
+ "VALIDATION ERROR: Unexpected `columns`[x]; file has headers or schema"));
+ assertTrue(e.getMessage().contains("Format plugin type: text"));
assertTrue(e.getMessage().contains("Plugin config name: csv"));
assertTrue(e.getMessage().contains("Extract headers: true"));
assertTrue(e.getMessage().contains("Skip first line: false"));
@@ -685,6 +684,33 @@ public class TestCsvWithHeaders extends BaseCsvTest {
RowSetUtilities.verify(expected, actual);
}
+ /**
+ * Test the LIMIT operation. No way to verify here that pushdown worked,
+ * do that by setting a breakpoint.
+ */
+ @Test
+ public void testLimit() throws IOException {
+ String fileName = buildBiggishFile();
+ {
+ String stmt = makeStatement(fileName) + " LIMIT 10";
+ RowSet actual = client.queryBuilder().sql(stmt).rowSet();
+ assertEquals(10, actual.rowCount());
+ RowSetReader reader = actual.reader();
+ reader.next();
+ assertEquals("1", reader.scalar("id").getString());
+ actual.clear();
+ }
+ {
+ String stmt = makeStatement(fileName) + " LIMIT 10 OFFSET 20";
+ RowSet actual = client.queryBuilder().sql(stmt).rowSet();
+ assertEquals(10, actual.rowCount());
+ RowSetReader reader = actual.reader();
+ reader.next();
+ assertEquals("21", reader.scalar("id").getString());
+ actual.clear();
+ }
+ }
+
private String makeStatement(String fileName) {
return "SELECT * FROM `dfs.data`.`" + fileName + "`";
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
index 7358d59..bc8e93a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestCsvWithSchema.java
@@ -101,6 +101,62 @@ public class TestCsvWithSchema extends BaseCsvTest {
}
/**
+ * An empty file with a schema is valid. Although there is no header line,
+ * we've provided a schema so we know about at least those columns.
+ * In the no-schema case, we return no batches at all. In the with-schema
+ * case, we return an empty batch with schema.
+ * <p>
+ * The analogy is:
+ * <br>{@code SELECT * FROM foo WHERE 1 = 0}<br>
+ * The implementation tested here follows that pattern.
+ * @throws Exception
+ *
+ * @see TestCsvWithoutHeaders#testEmptyFile()
+ * @see TestCsvWithHeaders#testEmptyFile()
+ */
+ @Test
+ public void testEmptyFile() throws Exception {
+ String tablePath = buildTable("empty", new String[] {});
+ try {
+ enableSchemaSupport();
+ String schemaSql = "create schema (a VARCHAR) " +
+ "for table " + tablePath;
+ run(schemaSql);
+ String sql = "SELECT * FROM " + tablePath;
+ RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("a", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetSchemaSupport();
+ }
+ }
+
+ @Test
+ public void testEmptyFileInline() throws Exception {
+ String tablePath = buildTable("empty", new String[] {});
+ try {
+ enableSchemaSupport();
+ String sql = "SELECT * FROM TABLE(" + tablePath +
+ "(schema=>'inline=(`a` VARCHAR)'))";
+ RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .addNullable("a", MinorType.VARCHAR)
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetSchemaSupport();
+ }
+ }
+
+ /**
* Test the simplest possible case: a table with one file:
* <ul>
* <li>Column in projection, table, and schema</li>
@@ -139,6 +195,36 @@ public class TestCsvWithSchema extends BaseCsvTest {
}
}
+ @Test
+ public void testBasicInlineSchema() throws Exception {
+ String tablePath = buildTable("basic2", basicFileContents);
+
+ try {
+ enableSchemaSupport();
+ String sql =
+ "SELECT `intcol`, `datecol`, `str`, `dub`, `extra`, `missing`\n" +
+ " FROM TABLE(" + tablePath +
+ " (schema=>'inline=(intcol int not null, datecol date not null, " +
+ " `dub` double not null, `extra` bigint not null default ''20'')'))";
+ RowSet actual = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("intcol", MinorType.INT) // Has a schema
+ .add("datecol", MinorType.DATE) // Has a schema
+ .add("str", MinorType.VARCHAR) // No schema, retains type
+ .add("dub", MinorType.FLOAT8) // Has a schema
+ .add("extra", MinorType.BIGINT) // No data, has default value
+ .add("missing", MinorType.VARCHAR) // No data, no schema, default type
+ .buildSchema();
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(10, LocalDate.of(2019, 3, 20), "it works!", 1234.5D, 20L, "")
+ .build();
+ RowSetUtilities.verify(expected, actual);
+ } finally {
+ resetSchemaSupport();
+ }
+ }
+
/**
* Show that the projection framework generates a reasonable default value
* if told to create a required column that does not exist. In this case,
@@ -1024,7 +1110,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
"col_int integer not null default '10', " +
"col_bigint bigint not null default '10', " +
"col_double double not null default '10.5', " +
- "col_float float not null default '10.5f', " +
+ "col_float float not null default '10.5', " +
"col_var varchar not null default 'foo', " +
"col_boolean boolean not null default '1', " +
"col_interval interval not null default 'P10D', " +
@@ -1053,7 +1139,7 @@ public class TestCsvWithSchema extends BaseCsvTest {
LocalDate ld = LocalDate.of(2019, 3, 28);
Instant ts = LocalDateTime.of(ld, lt).toInstant(ZoneOffset.UTC);
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(10, 10L, 10.5, 10.5f, "foo", true, new Period(0).plusDays(10),
+ .addRow(10, 10L, 10.5, 10.5D, "foo", true, new Period(0).plusDays(10),
lt, ld, ts, "1")
.build();
RowSetUtilities.verify(expected, actual);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
index 3798206..11a085b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/easy/text/compliant/TestTextReader.java
@@ -41,7 +41,7 @@ public class TestTextReader extends BaseTestQuery {
.go();
}
- @Ignore ("Not needed any more. (DRILL-3178)")
+ @Ignore("Not needed any more. (DRILL-3178)")
@Test
public void ensureFailureOnNewLineDelimiterWithinQuotes() {
try {
@@ -53,6 +53,10 @@ public class TestTextReader extends BaseTestQuery {
}
@Test
+ @Ignore("Query succeeds with EVF V2")
+ // Test should be modified for some other case. Note that this test is a bit
+ // inadequate: there are many places that can throw a validation error, all
+ // should be tested.
public void ensureColumnNameDisplayedinError() throws Exception {
final String COL_NAME = "col1";
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
index cdf0e8c..ad207ef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/mock/TestMockRowReader.java
@@ -49,13 +49,14 @@ import org.junit.experimental.categories.Category;
/**
* Tests the mock data source directly by wrapping it in a mock
- * scan operator, without the rest of Drill.
+ * scan operator, without the rest of Drill. A side effect is that this
+ * also tests the scan mechanism itself.
*/
-
@Category({RowSetTests.class, UnlikelyTest.class})
public class TestMockRowReader extends SubOperatorTest {
- private static ScanFixture buildScan(MockSubScanPOP config, List<ManagedReader<SchemaNegotiator>> readers) {
+ private static ScanFixture buildScan(MockSubScanPOP config,
+ List<ManagedReader<SchemaNegotiator>> readers) {
BaseScanFixtureBuilder builder = new BaseScanFixtureBuilder();
List<SchemaPath> projList = new ArrayList<>();
projList.add(SchemaPath.STAR_COLUMN);
@@ -67,7 +68,6 @@ public class TestMockRowReader extends SubOperatorTest {
/**
* Test the most basic case: required integers and strings.
*/
-
@Test
public void testBasics() {
int rowCount = 10;
@@ -117,7 +117,6 @@ public class TestMockRowReader extends SubOperatorTest {
* including filling values with nulls at some percentage, 25% by
* default.
*/
-
@Test
public void testOptional() {
int rowCount = 10;
@@ -165,7 +164,6 @@ public class TestMockRowReader extends SubOperatorTest {
/**
* Test a repeated column.
*/
-
@Test
public void testColumnRepeat() {
int rowCount = 10;
@@ -214,7 +212,6 @@ public class TestMockRowReader extends SubOperatorTest {
/**
* Verify limit on individual batch size (limiting row count per batch).
*/
-
@Test
public void testBatchSize() {
int rowCount = 20;
@@ -259,7 +256,6 @@ public class TestMockRowReader extends SubOperatorTest {
/**
* Test a mock varchar column large enough to cause vector overflow.
*/
-
@Test
public void testOverflow() {
int rowCount = ValueVector.MAX_ROW_COUNT;
@@ -271,7 +267,6 @@ public class TestMockRowReader extends SubOperatorTest {
MockSubScanPOP config = new MockSubScanPOP("dummy", true, Collections.singletonList(entry));
ManagedReader<SchemaNegotiator> reader = new ExtendedMockBatchReader(entry);
- @SuppressWarnings("unchecked")
List<ManagedReader<SchemaNegotiator>> readers = Collections.singletonList(reader);
// Create options and the scan operator
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
index 2cd1a0c..9ca931a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/util/StoragePluginTestUtils.java
@@ -31,7 +31,7 @@ import org.apache.drill.exec.store.dfs.FileSystemConfig;
import org.apache.drill.exec.store.dfs.WorkspaceConfig;
import org.apache.drill.exec.store.easy.sequencefile.SequenceFileFormatConfig;
-import org.apache.drill.exec.store.easy.text.TextFormatPlugin.TextFormatConfig;
+import org.apache.drill.exec.store.easy.text.TextFormatConfig;
/**
* Utility methods to speed up tests.