You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/04/03 21:46:06 UTC

[drill] branch master updated: DRILL-8186: Convert XML format to EVF2 (#2512)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new e19081fe32 DRILL-8186: Convert XML format to EVF2 (#2512)
e19081fe32 is described below

commit e19081fe327c79164b75a3fc19332898c02bd0ca
Author: luoc <lu...@apache.org>
AuthorDate: Mon Apr 4 05:46:02 2022 +0800

    DRILL-8186: Convert XML format to EVF2 (#2512)
---
 .../drill/exec/store/xml/XMLBatchReader.java       | 48 +++++++++-------------
 .../drill/exec/store/xml/XMLFormatConfig.java      |  2 +-
 .../drill/exec/store/xml/XMLFormatPlugin.java      | 37 +++++++----------
 .../org/apache/drill/exec/store/xml/XMLReader.java |  9 ++--
 .../drill/exec/store/http/HttpXMLBatchReader.java  |  2 +-
 5 files changed, 40 insertions(+), 58 deletions(-)

diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java
index efe1c0da39..52a2b6d903 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLBatchReader.java
@@ -18,35 +18,32 @@
 
 package org.apache.drill.exec.store.xml;
 
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.hadoop.mapred.FileSplit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.InputStream;
-
 
-public class XMLBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class XMLBatchReader implements ManagedReader {
 
   private static final Logger logger = LoggerFactory.getLogger(XMLBatchReader.class);
 
-  private FileSplit split;
-  private RowSetLoader rootRowWriter;
-  private CustomErrorContext errorContext;
+  private final FileDescrip file;
+  private final RowSetLoader rootRowWriter;
+  private final CustomErrorContext errorContext;
 
   private XMLReader reader;
-  private final int maxRecords;
   private final int dataLevel;
 
-
   static class XMLReaderConfig {
     final XMLFormatPlugin plugin;
     final int dataLevel;
@@ -57,20 +54,15 @@ public class XMLBatchReader implements ManagedReader<FileSchemaNegotiator> {
     }
   }
 
-  public XMLBatchReader(XMLReaderConfig readerConfig, EasySubScan scan) {
-    this.maxRecords = scan.getMaxRecords();
-    this.dataLevel = readerConfig.dataLevel;
-  }
+  public XMLBatchReader(XMLReaderConfig readerConfig, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    dataLevel = readerConfig.dataLevel;
+    file = negotiator.file();
 
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    split = negotiator.split();
     ResultSetLoader loader = negotiator.build();
-    errorContext = negotiator.parentErrorContext();
     rootRowWriter = loader.writer();
 
-    openFile(negotiator);
-    return true;
+    openFile();
   }
 
   @Override
@@ -80,18 +72,18 @@ public class XMLBatchReader implements ManagedReader<FileSchemaNegotiator> {
 
   @Override
   public void close() {
-    reader.close();
+    AutoCloseables.closeSilently(reader);
   }
 
-  private void openFile(FileScanFramework.FileSchemaNegotiator negotiator) {
+  private void openFile() {
     try {
-      InputStream fsStream = negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
-      reader = new XMLReader(fsStream, dataLevel, maxRecords);
+      InputStream fsStream = file.fileSystem().openPossiblyCompressedStream(file.split().getPath());
+      reader = new XMLReader(fsStream, dataLevel);
       reader.open(rootRowWriter, errorContext);
     } catch (Exception e) {
       throw UserException
         .dataReadError(e)
-        .message(String.format("Failed to open input file: %s", split.getPath().toString()))
+        .message(String.format("Failed to open input file: %s", file.split().getPath().toString()))
         .addContext(errorContext)
         .addContext(e.getMessage())
         .build(logger);
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java
index 0babf2091f..fbd44ad4d6 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatConfig.java
@@ -76,4 +76,4 @@ public class XMLFormatConfig implements FormatPluginConfig {
       .field("dataLevel", dataLevel)
       .toString();
   }
-}
\ No newline at end of file
+}
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
index 14ce06968b..5b270e53b6 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLFormatPlugin.java
@@ -21,21 +21,22 @@ package org.apache.drill.exec.store.xml;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
-import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin.ScanFrameworkVersion;
 import org.apache.hadoop.conf.Configuration;
 
 public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
 
   public static final String DEFAULT_NAME = "xml";
+  public static final String OPERATOR_TYPE = "XML_SUB_SCAN";
 
-  public static class XMLReaderFactory extends FileScanFramework.FileReaderFactory {
+  public static class XMLReaderFactory extends FileReaderFactory {
     private final XMLBatchReader.XMLReaderConfig readerConfig;
     private final EasySubScan scan;
 
@@ -45,8 +46,8 @@ public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
     }
 
     @Override
-    public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newReader() {
-      return new XMLBatchReader(readerConfig, scan);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+      return new XMLBatchReader(readerConfig, scan, negotiator);
     }
   }
 
@@ -64,27 +65,19 @@ public class XMLFormatPlugin extends EasyFormatPlugin<XMLFormatConfig> {
         .writable(false)
         .blockSplittable(false)
         .compressible(true)
-        .supportsProjectPushdown(true)
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
-        .defaultName(DEFAULT_NAME)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .readerOperatorType(OPERATOR_TYPE)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
+        .supportsProjectPushdown(true)
+        .defaultName(XMLFormatPlugin.DEFAULT_NAME)
         .build();
   }
 
   @Override
-  public ManagedReader<? extends FileScanFramework.FileSchemaNegotiator> newBatchReader(
-    EasySubScan scan, OptionManager options) {
-    return new XMLBatchReader(formatConfig.getReaderConfig(this), scan);
-  }
-
-  @Override
-  protected FileScanFramework.FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new XMLReaderFactory(new XMLBatchReader.XMLReaderConfig(this), scan));
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new XMLReaderFactory(formatConfig.getReaderConfig(this), scan));
   }
 }
diff --git a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
index 6b26eabe26..b3af9d2ea3 100644
--- a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
+++ b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
@@ -55,7 +55,6 @@ public class XMLReader implements Closeable {
   private final Stack<String> fieldNameStack;
   private final Stack<TupleWriter> rowWriterStack;
   private final int dataLevel;
-  private final int maxRecords;
   private final Map<String, XMLMap> nestedMapCollection;
 
   private TupleWriter attributeWriter;
@@ -89,7 +88,7 @@ public class XMLReader implements Closeable {
     ROW_ENDED
   }
 
-  public XMLReader(InputStream fsStream, int dataLevel, int maxRecords) throws XMLStreamException {
+  public XMLReader(InputStream fsStream, int dataLevel) throws XMLStreamException {
     this.fsStream = fsStream;
     XMLInputFactory inputFactory = XMLInputFactory.newInstance();
     reader = inputFactory.createXMLEventReader(fsStream);
@@ -97,7 +96,6 @@ public class XMLReader implements Closeable {
     rowWriterStack = new Stack<>();
     nestedMapCollection = new HashMap<>();
     this.dataLevel = dataLevel;
-    this.maxRecords = maxRecords;
     isSelfClosingEvent = false;
   }
 
@@ -125,6 +123,7 @@ public class XMLReader implements Closeable {
   }
 
 
+  @Override
   public void close() {
     if (fsStream != null) {
       AutoCloseables.closeSilently(fsStream);
@@ -153,9 +152,6 @@ public class XMLReader implements Closeable {
     if (!reader.hasNext()) {
       // Stop reading if there are no more results
       return false;
-    } else if (rootRowWriter.limitReached(maxRecords)) {
-      // Stop if the query limit has been reached
-      return false;
     }
 
     // Iterate over XML events
@@ -286,6 +282,7 @@ public class XMLReader implements Closeable {
             attributePrefix = XMLUtils.addField(attributePrefix, fieldName);
           }
 
+          @SuppressWarnings("unchecked")
           Iterator<Attribute> attributes = startElement.getAttributes();
           if (attributes != null && attributes.hasNext()) {
             writeAttributes(attributePrefix, attributes);
diff --git a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index 8e0aa684d4..da4f78c5db 100644
--- a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++ b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -96,7 +96,7 @@ public class HttpXMLBatchReader extends HttpBatchReader {
     inStream = http.getInputStream();
     // Initialize the XMLReader the reader
     try {
-      xmlReader = new XMLReader(inStream, dataLevel, maxRecords);
+      xmlReader = new XMLReader(inStream, dataLevel);
       resultLoader = negotiator.build();
 
       if (implicitColumnsAreProjected()) {