You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/03/22 15:06:17 UTC
[drill] branch master updated: DRILL-8171: Convert SequenceFile to EVF2 (#2498)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 4e97f5c DRILL-8171: Convert SequenceFile to EVF2 (#2498)
4e97f5c is described below
commit 4e97f5c7d9875dfda5785c6f01bc137973183d2b
Author: luoc <lu...@apache.org>
AuthorDate: Tue Mar 22 23:06:10 2022 +0800
DRILL-8171: Convert SequenceFile to EVF2 (#2498)
---
.../easy/sequencefile/SequenceFileBatchReader.java | 127 ++++++++-------------
.../sequencefile/SequenceFileFormatPlugin.java | 31 ++---
2 files changed, 60 insertions(+), 98 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
index b5d80f4..d7510c6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileBatchReader.java
@@ -21,12 +21,14 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileDescrip;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
@@ -36,7 +38,6 @@ import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
@@ -46,31 +47,47 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiator> {
+public class SequenceFileBatchReader implements ManagedReader {
private static final Logger logger = LoggerFactory.getLogger(SequenceFileBatchReader.class);
- private final SequenceFileFormatConfig config;
- private final EasySubScan scan;
- private FileSplit split;
- private String queryUserName;
- private String opUserName;
+ private final FileDescrip file;
+ private final String opUserName;
+ private final String queryUserName;
public static final String KEY_SCHEMA = "binary_key";
public static final String VALUE_SCHEMA = "binary_value";
private final BytesWritable key = new BytesWritable();
private final BytesWritable value = new BytesWritable();
- private final int maxRecords;
- private RowSetLoader loader;
- private ScalarWriter keyWriter;
- private ScalarWriter valueWriter;
+ private final RowSetLoader loader;
+ private final ScalarWriter keyWriter;
+ private final ScalarWriter valueWriter;
private RecordReader<BytesWritable, BytesWritable> reader;
- private CustomErrorContext errorContext;
- private Stopwatch watch;
+ private final CustomErrorContext errorContext;
+ private final Stopwatch watch;
- public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan) {
- this.config = config;
- this.scan = scan;
- this.maxRecords = scan.getMaxRecords();
+ public SequenceFileBatchReader(SequenceFileFormatConfig config, EasySubScan scan, FileSchemaNegotiator negotiator) {
+ errorContext = negotiator.parentErrorContext();
+ file = negotiator.file();
+ opUserName = scan.getUserName();
+ queryUserName = negotiator.context().getFragmentContext().getQueryUserName();
+
+ negotiator.tableSchema(defineMetadata(), true);
+ logger.trace("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
+ try {
+ processReader(negotiator);
+ } catch (ExecutionSetupException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure in initial sequencefile reader")
+ .addContext(e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ ResultSetLoader setLoader = negotiator.build();
+ loader = setLoader.writer();
+ keyWriter = loader.scalar(KEY_SCHEMA);
+ valueWriter = loader.scalar(VALUE_SCHEMA);
+ watch = Stopwatch.createStarted();
}
private TupleMetadata defineMetadata() {
@@ -82,12 +99,7 @@ public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiat
private void processReader(FileSchemaNegotiator negotiator) throws ExecutionSetupException {
final SequenceFileAsBinaryInputFormat inputFormat = new SequenceFileAsBinaryInputFormat();
- split = negotiator.split();
- // After defined the split, We should also define the errorContext.
- errorContext = negotiator.parentErrorContext();
- opUserName = scan.getUserName();
- queryUserName = negotiator.context().getFragmentContext().getQueryUserName();
- final JobConf jobConf = new JobConf(negotiator.fileSystem().getConf());
+ final JobConf jobConf = new JobConf(file.fileSystem().getConf());
jobConf.setInputFormat(inputFormat.getClass());
reader = getRecordReader(inputFormat, jobConf);
}
@@ -100,46 +112,22 @@ public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiat
return ugi.doAs(new PrivilegedExceptionAction<RecordReader<BytesWritable, BytesWritable>>() {
@Override
public RecordReader<BytesWritable, BytesWritable> run() throws Exception {
- return inputFormat.getRecordReader(split, jobConf, Reporter.NULL);
+ return inputFormat.getRecordReader(file.split(), jobConf, Reporter.NULL);
}
});
} catch (IOException | InterruptedException e) {
throw UserException
- .dataReadError(e)
- .message("Error in creating sequencefile reader for file: %s, start: %d, length: %d. "
- + e.getMessage(), split.getPath(), split.getStart(), split.getLength())
- .addContext(errorContext)
- .build(logger);
- }
- }
-
- @Override
- public boolean open(FileSchemaNegotiator negotiator) {
- negotiator.tableSchema(defineMetadata(), true);
- logger.debug("The config is {}, root is {}, columns has {}", config, scan.getSelectionRoot(), scan.getColumns());
- // open Sequencefile
- try {
- processReader(negotiator);
- } catch (ExecutionSetupException e) {
- throw UserException
.dataReadError(e)
- .message("Failure in initial sequencefile reader. " + e.getMessage())
+ .message("Error in creating sequencefile reader for file: %s, start: %d, length: %d",
+ file.split().getPath(), file.split().getStart(), file.split().getLength())
+ .addContext(e.getMessage())
.addContext(errorContext)
.build(logger);
}
- ResultSetLoader setLoader = negotiator.build();
- loader = setLoader.writer();
- keyWriter = loader.scalar(KEY_SCHEMA);
- valueWriter = loader.scalar(VALUE_SCHEMA);
- return true;
}
@Override
public boolean next() {
- int recordCount = 0;
- if (watch == null) {
- watch = Stopwatch.createStarted();
- }
try {
while (!loader.isFull()) {
if (reader.next(key, value)) {
@@ -147,40 +135,25 @@ public class SequenceFileBatchReader implements ManagedReader<FileSchemaNegotiat
keyWriter.setBytes(key.getBytes(), key.getLength());
valueWriter.setBytes(value.getBytes(), value.getLength());
loader.save();
- ++ recordCount;
} else {
- logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
- return false;
- }
- if (loader.limitReached(maxRecords)) {
+ logger.debug("Reader fetch {} records in {} ms", loader.rowCount(), watch.elapsed(TimeUnit.MILLISECONDS));
+ watch.stop();
return false;
}
}
+ return true;
} catch (IOException e) {
throw UserException
- .dataReadError(e)
- .message("An error occurred while reading the next key/value pair from the sequencefile reader. "
- + e.getMessage())
- .addContext(errorContext)
- .build(logger);
+ .dataReadError(e)
+ .message("An error occurred while reading the next key/value pair")
+ .addContext(e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
}
- return true;
}
@Override
public void close() {
- try {
- // The reader not support AutoCloseable, must be closed by invoke close().
- if (reader != null) {
- reader.close();
- reader = null;
- }
- } catch (IOException e) {
- throw UserException
- .dataReadError(e)
- .message("Failed closing sequencefile reader. " + e.getMessage())
- .addContext(errorContext)
- .build(logger);
- }
+ AutoCloseables.closeSilently(reader);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index b04742a..1396673 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -17,16 +17,15 @@
*/
package org.apache.drill.exec.store.easy.sequencefile;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
-import org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
-import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.v3.ManagedReader.EarlyEofException;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileReaderFactory;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileScanLifecycleBuilder;
+import org.apache.drill.exec.physical.impl.scan.v3.file.FileSchemaNegotiator;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
@@ -52,7 +51,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
.extensions(pluginConfig.getExtensions())
.fsConf(fsConf)
.readerOperatorType(OPERATOR_TYPE)
- .scanVersion(ScanFrameworkVersion.EVF_V1)
+ .scanVersion(ScanFrameworkVersion.EVF_V2)
.supportsLimitPushdown(true)
.supportsProjectPushdown(true)
.defaultName(SequenceFileFormatConfig.NAME)
@@ -70,24 +69,14 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newReader() {
- return new SequenceFileBatchReader(config, scan);
+ public ManagedReader newReader(FileSchemaNegotiator negotiator) throws EarlyEofException {
+ return new SequenceFileBatchReader(config, scan, negotiator);
}
}
@Override
- public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(EasySubScan scan, OptionManager options)
- throws ExecutionSetupException {
- return new SequenceFileBatchReader(formatConfig, scan);
- }
-
- @Override
- protected FileScanBuilder frameworkBuilder(OptionManager options, EasySubScan scan) throws ExecutionSetupException {
- FileScanBuilder builder = new FileScanBuilder();
- builder.setReaderFactory(new SequenceFileReaderFactory(formatConfig, scan));
-
- initScanBuilder(builder, scan);
+ protected void configureScan(FileScanLifecycleBuilder builder, EasySubScan scan) {
builder.nullType(Types.optional(MinorType.VARCHAR));
- return builder;
+ builder.readerFactory(new SequenceFileReaderFactory(formatConfig, scan));
}
}