You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by jn...@apache.org on 2017/06/03 04:46:02 UTC

[07/12] drill git commit: DRILL-5379: Set Hdfs Block Size based on Parquet Block Size

DRILL-5379: Set Hdfs Block Size based on Parquet Block Size

Provide an option to specify blocksize during file creation.
This will help create parquet files with single block on HDFS, helping improve performance when we read those files.

See DRILL-5379 for details.

closes #826


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9ab91ff2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9ab91ff2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9ab91ff2

Branch: refs/heads/master
Commit: 9ab91ff2640a8e89b92869d7dbb15acb9b602cd3
Parents: 9ba4af8
Author: Padma Penumarthy <pp...@yahoo.com>
Authored: Wed Apr 19 17:25:20 2017 -0700
Committer: Jinfeng Ni <jn...@apache.org>
Committed: Fri Jun 2 21:43:14 2017 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +++
 .../server/options/SystemOptionManager.java     |  1 +
 .../exec/store/parquet/ParquetFormatPlugin.java |  2 ++
 .../exec/store/parquet/ParquetRecordWriter.java | 22 ++++++++++++++++----
 4 files changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9ab91ff2/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index ba98532..7c681c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -167,6 +167,9 @@ public interface ExecConstants {
   OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
   String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
   OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+  String PARQUET_WRITER_USE_SINGLE_FS_BLOCK = "store.parquet.writer.use_single_fs_block";
+  OptionValidator PARQUET_WRITER_USE_SINGLE_FS_BLOCK_VALIDATOR = new BooleanValidator(
+    PARQUET_WRITER_USE_SINGLE_FS_BLOCK, false);
   String PARQUET_PAGE_SIZE = "store.parquet.page-size";
   OptionValidator PARQUET_PAGE_SIZE_VALIDATOR = new LongValidator(PARQUET_PAGE_SIZE, 1024*1024);
   String PARQUET_DICT_PAGE_SIZE = "store.parquet.dictionary.page-size";

http://git-wip-us.apache.org/repos/asf/drill/blob/9ab91ff2/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4f7ecc2..8492f36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -96,6 +96,7 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana
       ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
+      ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK_VALIDATOR,
       ExecConstants.PARQUET_PAGE_SIZE_VALIDATOR,
       ExecConstants.PARQUET_DICT_PAGE_SIZE_VALIDATOR,
       ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR,

http://git-wip-us.apache.org/repos/asf/drill/blob/9ab91ff2/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index f17d414..0eb4665 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -139,6 +139,8 @@ public class ParquetFormatPlugin implements FormatPlugin{
     options.put(FileSystem.FS_DEFAULT_NAME_KEY, ((FileSystemConfig)writer.getStorageConfig()).connection);
 
     options.put(ExecConstants.PARQUET_BLOCK_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_BLOCK_SIZE).num_val.toString());
+    options.put(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK,
+      context.getOptions().getOption(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK).bool_val.toString());
     options.put(ExecConstants.PARQUET_PAGE_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_PAGE_SIZE).num_val.toString());
     options.put(ExecConstants.PARQUET_DICT_PAGE_SIZE, context.getOptions().getOption(ExecConstants.PARQUET_DICT_PAGE_SIZE).num_val.toString());
 

http://git-wip-us.apache.org/repos/asf/drill/blob/9ab91ff2/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 7536d78..bc495a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import static java.lang.Math.ceil;
 import static java.lang.Math.max;
 import static java.lang.Math.min;
 
@@ -77,6 +78,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
   private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
+  private static final int BLOCKSIZE_MULTIPLE = 64 * 1024;
 
   public static final String DRILL_VERSION_PROPERTY = "drill.version";
   public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
@@ -89,6 +91,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
   private int pageSize;
   private int dictionaryPageSize;
   private boolean enableDictionary = false;
+  private boolean useSingleFSBlock = false;
   private CompressionCodecName codec = CompressionCodecName.SNAPPY;
   private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
   private CodecFactory codecFactory;
@@ -156,6 +159,12 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
     }
 
     enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
+    useSingleFSBlock = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK));
+
+    if (useSingleFSBlock) {
+      // Round up blockSize to multiple of 64K.
+      blockSize = (int)ceil((double)blockSize/BLOCKSIZE_MULTIPLE) * BLOCKSIZE_MULTIPLE;
+    }
   }
 
   private boolean containsComplexVectors(BatchSchema schema) {
@@ -380,14 +389,19 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
 
       // since ParquetFileWriter will overwrite empty output file (append is not supported)
       // we need to re-apply file permission
-      parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+      if (useSingleFSBlock) {
+        // Passing blockSize creates files with this blockSize instead of filesystem default blockSize.
+        // Currently, this is supported only by filesystems included in
+        // BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which includes HDFS.
+        // For other filesystems, it uses default blockSize configured for the file system.
+        parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE, blockSize, 0);
+      } else {
+        parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
+      }
       storageStrategy.applyToFile(fs, path);
-
       parquetFileWriter.start();
     }
-
     recordCount++;
-
     checkBlockSizeReached();
   }