You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/09/14 07:28:56 UTC
[09/15] drill git commit: DRILL-3720: Avro Record Reader should
process Avro files by per block basis
DRILL-3720: Avro Record Reader should process Avro files by per block basis
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8f4ca6ee
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8f4ca6ee
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8f4ca6ee
Branch: refs/heads/master
Commit: 8f4ca6ee851d1f504bd8f34a6329580a177d361d
Parents: 2214894
Author: Kamesh <ka...@gmail.com>
Authored: Fri Aug 28 15:20:44 2015 +0530
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Sep 13 21:58:33 2015 -0700
----------------------------------------------------------------------
.../drill/exec/store/avro/AvroFormatPlugin.java | 4 ++--
.../drill/exec/store/avro/AvroRecordReader.java | 14 ++++++++++++--
2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/8f4ca6ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 30c45fa..5a73a9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -47,7 +47,7 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
}
public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
- super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
+ super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("avro"), "avro");
}
@Override
@@ -57,7 +57,7 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
@Override
public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException {
- return new AvroRecordReader(context, fileWork.getPath(), dfs, columns);
+ return new AvroRecordReader(context, fileWork.getPath(), fileWork.getStart(), fileWork.getLength(), dfs, columns);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/8f4ca6ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index a52fd22..271c8e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -65,6 +65,8 @@ public class AvroRecordReader extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
private final Path hadoop;
+ private final long start;
+ private final long end;
private DrillBuf buffer;
private VectorContainerWriter writer;
@@ -77,17 +79,23 @@ public class AvroRecordReader extends AbstractRecordReader {
public AvroRecordReader(final FragmentContext fragmentContext,
final String inputPath,
+ final long start,
+ final long length,
final FileSystem fileSystem,
final List<SchemaPath> projectedColumns) {
- this(fragmentContext, inputPath, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
+ this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
}
public AvroRecordReader(final FragmentContext fragmentContext,
final String inputPath,
+ final long start,
+ final long length,
final FileSystem fileSystem,
List<SchemaPath> projectedColumns, final int defaultBatchSize) {
hadoop = new Path(inputPath);
+ this.start = start;
+ this.end = start + length;
buffer = fragmentContext.getManagedBuffer();
this.fs = fileSystem;
@@ -101,6 +109,8 @@ public class AvroRecordReader extends AbstractRecordReader {
try {
reader = new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
+ logger.debug("Processing file : {}, start position : {}, end position : {} ", hadoop, start, end);
+ reader.sync(this.start);
} catch (IOException e) {
throw new ExecutionSetupException(e);
}
@@ -125,7 +135,7 @@ public class AvroRecordReader extends AbstractRecordReader {
// XXX - Implement batch size
- for (GenericContainer container = null; reader.hasNext(); recordCount++) {
+ for (GenericContainer container = null; reader.hasNext() && !reader.pastSync(end); recordCount++) {
writer.setPosition(recordCount);
container = reader.next(container);
processRecord(container, container.getSchema());