You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2022/03/21 10:01:29 UTC

[iceberg] branch master updated: ORC: Read configured key-value from table properties. (#4291)

This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b4d6489  ORC: Read configured key-value from table properties. (#4291)
b4d6489 is described below

commit b4d64895ec279a705e7e115c7141d12a479c3a6f
Author: liliwei <hi...@gmail.com>
AuthorDate: Mon Mar 21 18:01:10 2022 +0800

    ORC: Read configured key-value from table properties. (#4291)
---
 .../java/org/apache/iceberg/TableProperties.java   |  4 +
 orc/src/main/java/org/apache/iceberg/orc/ORC.java  | 99 ++++++++++++++--------
 2 files changed, 68 insertions(+), 35 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 1d3d414..ee25d90 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -143,6 +143,10 @@ public class TableProperties {
   public static final String DELETE_ORC_BLOCK_SIZE_BYTES = "write.delete.orc.block-size-bytes";
   public static final long ORC_BLOCK_SIZE_BYTES_DEFAULT = 256L * 1024 * 1024; // 256 MB
 
+  public static final String ORC_WRITE_BATCH_SIZE = "write.orc.vectorized.batch-size";
+  public static final String DELETE_ORC_WRITE_BATCH_SIZE = "write.delete.orc.vectorized.batch-size";
+  public static final int ORC_WRITE_BATCH_SIZE_DEFAULT = 1024;
+
   public static final String SPLIT_SIZE = "read.split.target-size";
   public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
 
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 4c6b704..12458d1 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -57,6 +57,7 @@ import org.apache.iceberg.mapping.NameMapping;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.PropertyUtil;
 import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcFile.ReaderOptions;
@@ -67,6 +68,10 @@ import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 @SuppressWarnings("checkstyle:AbbreviationAsWordInName")
 public class ORC {
 
+  /**
+   * @deprecated use {@link TableProperties#ORC_WRITE_BATCH_SIZE} instead
+   */
+  @Deprecated
   private static final String VECTOR_ROW_BATCH_SIZE = "iceberg.orc.vectorbatch.size";
 
   private ORC() {
@@ -78,20 +83,16 @@ public class ORC {
 
   public static class WriteBuilder {
     private final OutputFile file;
-    private final Configuration conf;
     private Schema schema = null;
     private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> createWriterFunc;
     private Map<String, byte[]> metadata = Maps.newHashMap();
     private MetricsConfig metricsConfig;
-    private Function<Configuration, Context> createContextFunc = Context::dataContext;
+    private Function<Map<String, String>, Context> createContextFunc = Context::dataContext;
+    private final Map<String, String> config = Maps.newLinkedHashMap();
+    private boolean overwrite = false;
 
     private WriteBuilder(OutputFile file) {
       this.file = file;
-      if (file instanceof HadoopOutputFile) {
-        this.conf = new Configuration(((HadoopOutputFile) file).getConf());
-      } else {
-        this.conf = new Configuration();
-      }
     }
 
     public WriteBuilder forTable(Table table) {
@@ -119,7 +120,7 @@ public class ORC {
     }
 
     public WriteBuilder set(String property, String value) {
-      conf.set(property, value);
+      config.put(property, value);
       return this;
     }
 
@@ -129,7 +130,7 @@ public class ORC {
     }
 
     public WriteBuilder setAll(Map<String, String> properties) {
-      properties.forEach(conf::set);
+      config.putAll(properties);
       return this;
     }
 
@@ -143,7 +144,7 @@ public class ORC {
     }
 
     public WriteBuilder overwrite(boolean enabled) {
-      OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, enabled);
+      this.overwrite = enabled;
       return this;
     }
 
@@ -153,7 +154,7 @@ public class ORC {
     }
 
     // supposed to always be a private method used strictly by data and delete write builders
-    private WriteBuilder createContextFunc(Function<Configuration, Context> newCreateContextFunc) {
+    private WriteBuilder createContextFunc(Function<Map<String, String>, Context> newCreateContextFunc) {
       this.createContextFunc = newCreateContextFunc;
       return this;
     }
@@ -161,22 +162,38 @@ public class ORC {
     public <D> FileAppender<D> build() {
       Preconditions.checkNotNull(schema, "Schema is required");
 
+      Configuration conf;
+      if (file instanceof HadoopOutputFile) {
+        conf = ((HadoopOutputFile) file).getConf();
+      } else {
+        conf = new Configuration();
+      }
+
+      for (Map.Entry<String, String> entry : config.entrySet()) {
+        conf.set(entry.getKey(), entry.getValue());
+      }
+
+      // for compatibility
+      if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(TableProperties.ORC_WRITE_BATCH_SIZE) == null) {
+        config.put(TableProperties.ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
+      }
+
       // Map Iceberg properties to pass down to the ORC writer
-      Context context = createContextFunc.apply(conf);
-      long stripeSize = context.stripeSize();
-      long blockSize = context.blockSize();
+      Context context = createContextFunc.apply(config);
+      conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), context.stripeSize());
+      conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), context.blockSize());
 
-      conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), stripeSize);
-      conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), blockSize);
+      conf.setBoolean(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), overwrite);
 
       return new OrcFileAppender<>(schema,
           this.file, createWriterFunc, conf, metadata,
-          conf.getInt(VECTOR_ROW_BATCH_SIZE, VectorizedRowBatch.DEFAULT_SIZE), metricsConfig);
+          context.vectorizedRowBatchSize(), metricsConfig);
     }
 
     private static class Context {
       private final long stripeSize;
       private final long blockSize;
+      private final int vectorizedRowBatchSize;
 
       public long stripeSize() {
         return stripeSize;
@@ -186,35 +203,47 @@ public class ORC {
         return blockSize;
       }
 
-      private Context(long stripeSize, long blockSize) {
+      public int vectorizedRowBatchSize() {
+        return vectorizedRowBatchSize;
+      }
+
+      private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize) {
         this.stripeSize = stripeSize;
         this.blockSize = blockSize;
+        this.vectorizedRowBatchSize = vectorizedRowBatchSize;
       }
 
-      static Context dataContext(Configuration conf) {
-        long stripeSize = TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT;
-        stripeSize = conf.getLong(OrcConf.STRIPE_SIZE.getAttribute(), stripeSize);
-        stripeSize = conf.getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), stripeSize);
-        stripeSize = conf.getLong(TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
+      static Context dataContext(Map<String, String> config) {
+        long stripeSize = PropertyUtil.propertyAsLong(config, OrcConf.STRIPE_SIZE.getAttribute(),
+            TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT);
+        stripeSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
+        Preconditions.checkArgument(stripeSize > 0, "Stripe size must be > 0");
 
-        long blockSize = TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
-        blockSize = conf.getLong(OrcConf.BLOCK_SIZE.getAttribute(), blockSize);
-        blockSize = conf.getLong(OrcConf.BLOCK_SIZE.getHiveConfName(), blockSize);
-        blockSize = conf.getLong(TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
+        long blockSize = PropertyUtil.propertyAsLong(config, OrcConf.BLOCK_SIZE.getAttribute(),
+            TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
+        blockSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
+        Preconditions.checkArgument(blockSize > 0, "Block size must be > 0");
 
-        return new Context(stripeSize, blockSize);
+        int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
+            TableProperties.ORC_WRITE_BATCH_SIZE, TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT);
+        Preconditions.checkArgument(vectorizedRowBatchSize > 0, "VectorizedRow batch size must be > 0");
+
+        return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
       }
 
-      static Context deleteContext(Configuration conf) {
-        Context dataContext = dataContext(conf);
+      static Context deleteContext(Map<String, String> config) {
+        Context dataContext = dataContext(config);
+
+        long stripeSize = PropertyUtil.propertyAsLong(config,
+            TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
 
-        long stripeSize =
-            conf.getLong(TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
+        long blockSize = PropertyUtil.propertyAsLong(config,
+            TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());
 
-        long blockSize =
-            conf.getLong(TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.stripeSize());
+        int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
+            TableProperties.DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());
 
-        return new Context(stripeSize, blockSize);
+        return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
       }
     }
   }