You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/22 15:06:17 UTC

[drill] branch master updated: DRILL-8171: Convert SequenceFile to EVF2 (#2498)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e97f5c  DRILL-8171: Convert SequenceFile to EVF2 (#2498)
4e97f5c is described below

commit 4e97f5c7d9875dfda5785c6f01bc137973183d2b
Author: luoc <lu...@apache.org>
AuthorDate: Tue Mar 22 23:06:10 2022 +0800

    DRILL-8171: Convert SequenceFile to EVF2 (#2498)
---
 .../easy/sequencefile/SequenceFileBatchReader.java | 127 ++++++++-------------
 .../sequencefile/SequenceFileFormatPlugin.java     |  31 ++---
 2 files changed, 60 insertions(+), 98 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
index b5d80f4..d7510c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
@@ -21,12 +21,14 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
 import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -36,7 +38,6 @@ import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.vector.accessor.ScalarWriter;
 import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -46,31 +47,47 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class SequenceFileBatchReader implements ManagedReader {
 
   private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
 
-  private final SequenceFileFormatConfig config;
-  private final EasySubScan scan;
-  private FileSplit split;
-  private String queryUserName;
-  private String opUserName;
+  private final FileDescrip file;
+  private final String opUserName;
+  private final String queryUserName;
   public static final String KEY_SCHEMA = "binary_key";
   public static final String VALUE_SCHEMA = "binary_value";
   private final BytesWritable key = new BytesWritable();
   private final BytesWritable value = new BytesWritable();
-  private final int maxRecords;
-  private RowSetLoader loader;
-  private ScalarWriter keyWriter;
-  private ScalarWriter valueWriter;
+  private final RowSetLoader loader;
+  private final ScalarWriter keyWriter;
+  private final ScalarWriter valueWriter;
   private RecordReader<BytesWritable, BytesWritable> reader;
-  private CustomErrorContext errorContext;
-  private Stopwatch watch;
+  private final CustomErrorContext errorContext;
+  private final Stopwatch watch;
 
-  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
-    this.config = config;
-    this.scan = scan;
-    this.maxRecords = scan.getMaxRecords();
+  public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+    errorContext = negotiator.parentErrorContext();
+    file = negotiator.file();
+    opUserName = scan.getUserName();
+    queryUserName = negotiator.context().getFragmentContext().getQueryUserName();
+
+    negotiator.tableSchema(defineMetadata(), true);
+    logger.trace("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+    try {
+      processReader(negotiator);
+    } catch (ExecutionSetupException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Failure in initial sequencefile reader")
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    ResultSetLoader setLoader = negotiator.build();
+    loader = setLoader.writer();
+    keyWriter = loader.scalar(KEY_SCHEMA);
+    valueWriter = loader.scalar(VALUE_SCHEMA);
+    watch = Stopwatch.createStarted();
   }
 
   private TupleMetadata defineMetadata() {
@@ -82,12 +99,7 @@ public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiat
 
   private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
     final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
-    split = negotiator.split();
-    // After defined the split, We should also define the errorContext.
-    errorContext = negotiator.parentErrorContext();
-    opUserName = scan.getUserName();
-    queryUserName = negotiator.context().getFragmentContext().getQueryUserName();
-    final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+    final JobConf jobConf = new JobConf(file.fileSystem().getConf());
     jobConf.setInputFormat(inputFormat.getClass());
     reader = getRecordReader(inputFormat, jobConf);
   }
@@ -100,46 +112,22 @@ public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiat
       return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
         @Override
         public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
-          return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+          return inputFormat.getRecordReader(file.split(), jobConf, Reporter.NULL);
         }
       });
     } catch (IOException | InterruptedException e) {
       throw UserException
-              .dataReadError(e)
-              .message("Error in creating sequencefile reader for file: %s, start: %d, length: %d. "
-               + e.getMessage(), split.getPath(), split.getStart(), split.getLength())
-              .addContext(errorContext)
-              .build(logger);
-    }
-  }
-
-  @Override
-  public boolean open(FileSchemaNegotiator negotiator) {
-    negotiator.tableSchema(defineMetadata(), true);
-    logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
-    // open Sequencefile
-    try {
-      processReader(negotiator);
-    } catch (ExecutionSetupException e) {
-      throw UserException
         .dataReadError(e)
-        .message("Failure in initial sequencefile reader. " + e.getMessage())
+        .message("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+          file.split().getPath(), file.split().getStart(), file.split().getLength())
+        .addContext(e.getMessage())
         .addContext(errorContext)
         .build(logger);
     }
-    ResultSetLoader setLoader = negotiator.build();
-    loader = setLoader.writer();
-    keyWriter = loader.scalar(KEY_SCHEMA);
-    valueWriter = loader.scalar(VALUE_SCHEMA);
-    return true;
   }
 
   @Override
   public boolean next() {
-    int recordCount = 0;
-    if (watch == null) {
-      watch = Stopwatch.createStarted();
-    }
     try {
       while (!loader.isFull()) {
         if (reader.next(key, value)) {
@@ -147,40 +135,25 @@ public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiat
           keyWriter.setBytes(key.getBytes(), key.getLength());
           valueWriter.setBytes(value.getBytes(), value.getLength());
           loader.save();
-          ++ recordCount;
         } else {
-          logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
-          return false;
-        }
-        if (loader.limitReached(maxRecords)) {
+          logger.debug("Reader fetch {} records in {} ms", loader.rowCount(), watch.elapsed(TimeUnit.MILLISECONDS));
+          watch.stop();
           return false;
         }
       }
+      return true;
     } catch (IOException e) {
       throw UserException
-              .dataReadError(e)
-              .message("An error occurred while reading the next key/value pair from the sequencefile reader. "
-               + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
+        .dataReadError(e)
+        .message("An error occurred while reading the next key/value pair")
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
     }
-    return true;
   }
 
   @Override
   public void close() {
-    try {
-      // The reader not support AutoCloseable, must be closed by invoke close().
-      if (reader != null) {
-        reader.close();
-        reader = null;
-      }
-    } catch (IOException e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed closing sequencefile reader. " + e.getMessage())
-              .addContext(errorContext)
-              .build(logger);
-    }
+    AutoCloseables.closeSilently(reader);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index b04742a..1396673 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -17,16 +17,15 @@
  */
 package org.apache.drill.exec.store.easy.sequencefile;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
 import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
@@ -52,7 +51,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
         .extensions(pluginConfig.getExtensions())
         .fsConf(fsConf)
         .readerOperatorType(OPERATOR_TYPE)
-        .scanVersion(ScanFrameworkVersion.EVF_V1)
+        .scanVersion(ScanFrameworkVersion.EVF_V2)
         .supportsLimitPushdown(true)
         .supportsProjectPushdown(true)
         .defaultName(SequenceFileFormatConfig.NAME)
@@ -70,24 +69,14 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
     }
 
     @Override
-    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
-      return new SequenceFileBatchReader(config, scan);
+    public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+      return new SequenceFileBatchReader(config, scan, negotiator);
     }
   }
 
   @Override
-  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options)
-      throws ExecutionSetupException {
-    return new SequenceFileBatchReader(formatConfig, scan);
-  }
-
-  @Override
-  protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
-    FileScanBuilder builder = new FileScanBuilder();
-    builder.setReaderFactory(new SequenceFileReaderFactory(formatConfig, scan));
-
-    initScanBuilder(builder, scan);
+  protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
     builder.nullType(Types.optional(MinorType.VARCHAR));
-    return builder;
+    builder.readerFactory(new SequenceFileReaderFactory(formatConfig, scan));
   }
 }