You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/09/14 07:28:56 UTC

[09/15] drill git commit: DRILL-3720: Avro Record Reader should process Avro files by per block basis

DRILL-3720: Avro Record Reader should process Avro files by per block basis


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/8f4ca6ee
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/8f4ca6ee
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/8f4ca6ee

Branch: refs/heads/master
Commit: 8f4ca6ee851d1f504bd8f34a6329580a177d361d
Parents: 2214894
Author: Kamesh <ka...@gmail.com>
Authored: Fri Aug 28 15:20:44 2015 +0530
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Sun Sep 13 21:58:33 2015 -0700

----------------------------------------------------------------------
 .../drill/exec/store/avro/AvroFormatPlugin.java       |  4 ++--
 .../drill/exec/store/avro/AvroRecordReader.java       | 14 ++++++++++++--
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/8f4ca6ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
index 30c45fa..5a73a9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroFormatPlugin.java
@@ -47,7 +47,7 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
   }
 
   public AvroFormatPlugin(String name, DrillbitContext context, Configuration fsConf, StoragePluginConfig config, AvroFormatConfig formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true, false, false, false, Lists.newArrayList("avro"), "avro");
+    super(name, context, fsConf, config, formatPluginConfig, true, false, true, false, Lists.newArrayList("avro"), "avro");
   }
 
   @Override
@@ -57,7 +57,7 @@ public class AvroFormatPlugin extends EasyFormatPlugin<AvroFormatConfig> {
 
   @Override
   public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException {
-    return new AvroRecordReader(context, fileWork.getPath(), dfs, columns);
+    return new AvroRecordReader(context, fileWork.getPath(), fileWork.getStart(), fileWork.getLength(), dfs, columns);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/8f4ca6ee/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index a52fd22..271c8e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -65,6 +65,8 @@ public class AvroRecordReader extends AbstractRecordReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
 
   private final Path hadoop;
+  private final long start;
+  private final long end;
   private DrillBuf buffer;
   private VectorContainerWriter writer;
 
@@ -77,17 +79,23 @@ public class AvroRecordReader extends AbstractRecordReader {
 
   public AvroRecordReader(final FragmentContext fragmentContext,
                           final String inputPath,
+                          final long start,
+                          final long length,
                           final FileSystem fileSystem,
                           final List<SchemaPath> projectedColumns) {
-    this(fragmentContext, inputPath, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
+    this(fragmentContext, inputPath, start, length, fileSystem, projectedColumns, DEFAULT_BATCH_SIZE);
   }
 
   public AvroRecordReader(final FragmentContext fragmentContext,
                           final String inputPath,
+                          final long start,
+                          final long length,
                           final FileSystem fileSystem,
                           List<SchemaPath> projectedColumns, final int defaultBatchSize) {
 
     hadoop = new Path(inputPath);
+    this.start = start;
+    this.end = start + length;
     buffer = fragmentContext.getManagedBuffer();
     this.fs = fileSystem;
 
@@ -101,6 +109,8 @@ public class AvroRecordReader extends AbstractRecordReader {
 
     try {
       reader = new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
+      logger.debug("Processing file : {}, start position : {}, end position : {} ", hadoop, start, end);
+      reader.sync(this.start);
     } catch (IOException e) {
       throw new ExecutionSetupException(e);
     }
@@ -125,7 +135,7 @@ public class AvroRecordReader extends AbstractRecordReader {
 
       // XXX - Implement batch size
 
-      for (GenericContainer container = null; reader.hasNext(); recordCount++) {
+      for (GenericContainer container = null; reader.hasNext() && !reader.pastSync(end); recordCount++) {
         writer.setPosition(recordCount);
         container = reader.next(container);
         processRecord(container, container.getSchema());