You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/04/03 21:46:06 UTC
[drill] branch master updated: DRILL-8186: Convert XML format to EVF2 (#2512)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new e19081fe32 DRILL-8186: Convert XML format to EVF2 (#2512)
e19081fe32 is described below
commit e19081fe327c79164b75a3fc19332898c02bd0ca
Author: luoc <lu...@apache.org>
AuthorDate: Mon Apr 4 05:46:02 2022 +0800
DRILL-8186: Convert XML format to EVF2 (#2512)
---
.../drill/exec/store/xml/XMLBatchReader.java | 48 +++++++++-------------
.../drill/exec/store/xml/XMLFormatConfig.java | 2 +-
.../drill/exec/store/xml/XMLFormatPlugin.java | 37 +++++++----------
.../org/apache/drill/exec/store/xml/XMLReader.java | 9 ++--
.../drill/exec/store/http/HttpXMLBatchReader.java | 2 +-
5 files changed, 40 insertions(+), 58 deletions(-)
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java
index efe1c0da39..52a2b6d903 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java
@@ -18,35 +18,32 @@
package org.apache.drill.exec.store.xml;
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
-
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.hadoop.mapred.FileSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.InputStream;
-
-public class XMLBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class XMLBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(XMLBatchReader.class);
- private FileSplit split;
- private RowSetLoader rootRowWriter;
- private CustomErrorContext errorContext;
+ private final FileDescrip file;
+ private final RowSetLoader rootRowWriter;
+ private final CustomErrorContext errorContext;
private XMLReader reader;
- private final int maxRecords;
private final int dataLevel;
-
static class XMLReaderConfig {
final XMLFormatPlugin plugin;
final int dataLevel;
@@ -57,20 +54,15 @@ public class XMLBatchReader implements ManagedReader<FileSchemaNegotiator> {
}
}
- public XMLBatchReader(XMLReaderConfig readerConfig, EasySubScan scan) {
- this.maxRecords = scan.getMaxRecords();
- this.dataLevel = readerConfig.dataLevel;
- }
+ public XMLBatchReader(XMLReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+ errorContext = negotiator.parentErrorContext();
+ dataLevel = readerConfig.dataLevel;
+ file = negotiator.file();
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- split = negotiator.split();
ResultSetLoader loader = negotiator.build();
- errorContext = negotiator.parentErrorContext();
rootRowWriter = loader.writer();
- openFile(negotiator);
- return true;
+ openFile();
}
@Override
@@ -80,18 +72,18 @@ public class XMLBatchReader implements ManagedReader<FileSchemaNegotiator> {
@Override
public void close() {
- reader.close();
+ AutoCloseables.closeSilently(reader);
}
- private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+ private void openFile() {
try {
- InputStream fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
- reader = new XMLReader(fsStream, dataLevel, maxRecords);
+ InputStream fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+ reader = new XMLReader(fsStream, dataLevel);
reader.open(rootRowWriter, errorContext);
} catch (Exception e) {
throw UserException
.dataReadError(e)
- .message(String.format("Failed to open input file: %s", split.getPath().toString()))
+ .message(String.format("Failed to open input file: %s", file.split().getPath().toString()))
.addContext(errorContext)
.addContext(e.getMessage())
.build(logger);
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java
index 0babf2091f..fbd44ad4d6 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java
@@ -76,4 +76,4 @@ public class XMLFormatConfig implements FormatPluginConfig {
.field("dataLevel", dataLevel)
.toString();
}
-}
\ No newline at end of file
+}
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
index 14ce06968b..5b270e53b6 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
@@ -21,21 +21,22 @@ package org.apache.drill.exec.store.xml;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
import org.apache.hadoop.conf.Configuration;
public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
public static final String DEFAULT_NAME = "xml";
+ public static final String OPERATOR_TYPE = "XML_SUB_SCAN";
- public static class XMLReaderFactory extends FileScanFramework.FileReaderFactory {
+ public static class XMLReaderFactory extends FileReaderFactory {
private final XMLBatchReader.XMLReaderConfig readerConfig;
private final EasySubScan scan;
@@ -45,8 +46,8 @@ public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
}
@Override
- public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
- return new XMLBatchReader(readerConfig, scan);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new XMLBatchReader(readerConfig, scan, negotiator);
}
}
@@ -64,27 +65,19 @@ public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
.writable(false)
.blockSplittable(false)
.compressible(true)
- .supportsProjectPushdown(true)
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
- .defaultName(DEFAULT_NAME)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .readerOperatorType(OPERATOR_TYPE)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
+ .supportsProjectPushdown(true)
+ .defaultName(XMLFormatPlugin.DEFAULT_NAME)
.build();
}
@Override
- public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newBatchReader(
- EasySubScan scan, OptionManager options) {
- return new XMLBatchReader(formatConfig.getReaderConfig(this), scan);
- }
-
- @Override
- protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new XMLReaderFactory(new XMLBatchReader.XMLReaderConfig(this), scan));
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new XMLReaderFactory(formatConfig.getReaderConfig(this), scan));
}
}
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
index 6b26eabe26..b3af9d2ea3 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
@@ -55,7 +55,6 @@ public class XMLReader implements Closeable {
private final Stack<String> fieldNameStack;
private final Stack<TupleWriter> rowWriterStack;
private final int dataLevel;
- private final int maxRecords;
private final Map<String, XMLMap> nestedMapCollection;
private TupleWriter attributeWriter;
@@ -89,7 +88,7 @@ public class XMLReader implements Closeable {
ROW_ENDED
}
- public XMLReader(InputStream fsStream, int dataLevel, int maxRecords) throws XMLStreamException {
+ public XMLReader(InputStream fsStream, int dataLevel) throws XMLStreamException {
this.fsStream = fsStream;
XMLInputFactory inputFactory = XMLInputFactory.newInstance();
reader = inputFactory.createXMLEventReader(fsStream);
@@ -97,7 +96,6 @@ public class XMLReader implements Closeable {
rowWriterStack = new Stack<>();
nestedMapCollection = new HashMap<>();
this.dataLevel = dataLevel;
- this.maxRecords = maxRecords;
isSelfClosingEvent = false;
}
@@ -125,6 +123,7 @@ public class XMLReader implements Closeable {
}
+ @Override
public void close() {
if (fsStream != null) {
AutoCloseables.closeSilently(fsStream);
@@ -153,9 +152,6 @@ public class XMLReader implements Closeable {
if (!reader.hasNext()) {
// Stop reading if there are no more results
return false;
- } else if (rootRowWriter.limitReached(maxRecords)) {
- // Stop if the query limit has been reached
- return false;
}
// Iterate over XML events
@@ -286,6 +282,7 @@ public class XMLReader implements Closeable {
attributePrefix = XMLUtils.addField(attributePrefix, fieldName);
}
+ @SuppressWarnings("unchecked")
Iterator<Attribute> attributes = startElement.getAttributes();
if (attributes != null && attributes.hasNext()) {
writeAttributes(attributePrefix, attributes);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index 8e0aa684d4..da4f78c5db 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -96,7 +96,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
inStream = http.getInputStream();
// Initialize the XMLReader the reader
try {
- xmlReader = new XMLReader(inStream, dataLevel, maxRecords);
+ xmlReader = new XMLReader(inStream, dataLevel);
resultLoader = negotiator.build();
if (implicitColumnsAreProjected()) {