You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by op...@apache.org on 2022/03/21 10:01:29 UTC
[iceberg] branch master updated: ORC: Read configured key-value from table properties. (#4291)
This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b4d6489 ORC: Read configured key-value from table properties. (#4291)
b4d6489 is described below
commit b4d64895ec279a705e7e115c7141d12a479c3a6f
Author: liliwei <hi...@gmail.com>
AuthorDate: Mon Mar 21 18:01:10 2022 +0800
ORC: Read configured key-value from table properties. (#4291)
---
.../java/org/apache/iceberg/TableProperties.java | 4 +
orc/src/main/java/org/apache/iceberg/orc/ORC.java | 99 ++++++++++++++--------
2 files changed, 68 insertions(+), 35 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index 1d3d414..ee25d90 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -143,6 +143,10 @@ public class TableProperties {
public static final String DELETE_ORC_BLOCK_SIZE_BYTES = "write.delete.orc.block-size-bytes";
public static final long ORC_BLOCK_SIZE_BYTES_DEFAULT = 256L * 1024 * 1024; // 256 MB
+ public static final String ORC_WRITE_BATCH_SIZE = "write.orc.vectorized.batch-size";
+ public static final String DELETE_ORC_WRITE_BATCH_SIZE = "write.delete.orc.vectorized.batch-size";
+ public static final int ORC_WRITE_BATCH_SIZE_DEFAULT = 1024;
+
public static final String SPLIT_SIZE = "read.split.target-size";
public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB
diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORC.java b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
index 4c6b704..12458d1 100644
--- a/orc/src/main/java/org/apache/iceberg/orc/ORC.java
+++ b/orc/src/main/java/org/apache/iceberg/orc/ORC.java
@@ -57,6 +57,7 @@ import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ArrayUtil;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcFile.ReaderOptions;
@@ -67,6 +68,10 @@ import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class ORC {
+ /**
+ * @deprecated use {@link TableProperties#ORC_WRITE_BATCH_SIZE} instead
+ */
+ @Deprecated
private static final String VECTOR_ROW_BATCH_SIZE = "iceberg.orc.vectorbatch.size";
private ORC() {
@@ -78,20 +83,16 @@ public class ORC {
public static class WriteBuilder {
private final OutputFile file;
- private final Configuration conf;
private Schema schema = null;
private BiFunction<Schema, TypeDescription, OrcRowWriter<?>> createWriterFunc;
private Map<String, byte[]> metadata = Maps.newHashMap();
private MetricsConfig metricsConfig;
- private Function<Configuration, Context> createContextFunc = Context::dataContext;
+ private Function<Map<String, String>, Context> createContextFunc = Context::dataContext;
+ private final Map<String, String> config = Maps.newLinkedHashMap();
+ private boolean overwrite = false;
private WriteBuilder(OutputFile file) {
this.file = file;
- if (file instanceof HadoopOutputFile) {
- this.conf = new Configuration(((HadoopOutputFile) file).getConf());
- } else {
- this.conf = new Configuration();
- }
}
public WriteBuilder forTable(Table table) {
@@ -119,7 +120,7 @@ public class ORC {
}
public WriteBuilder set(String property, String value) {
- conf.set(property, value);
+ config.put(property, value);
return this;
}
@@ -129,7 +130,7 @@ public class ORC {
}
public WriteBuilder setAll(Map<String, String> properties) {
- properties.forEach(conf::set);
+ config.putAll(properties);
return this;
}
@@ -143,7 +144,7 @@ public class ORC {
}
public WriteBuilder overwrite(boolean enabled) {
- OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, enabled);
+ this.overwrite = enabled;
return this;
}
@@ -153,7 +154,7 @@ public class ORC {
}
// supposed to always be a private method used strictly by data and delete write builders
- private WriteBuilder createContextFunc(Function<Configuration, Context> newCreateContextFunc) {
+ private WriteBuilder createContextFunc(Function<Map<String, String>, Context> newCreateContextFunc) {
this.createContextFunc = newCreateContextFunc;
return this;
}
@@ -161,22 +162,38 @@ public class ORC {
public <D> FileAppender<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
+ Configuration conf;
+ if (file instanceof HadoopOutputFile) {
+ conf = ((HadoopOutputFile) file).getConf();
+ } else {
+ conf = new Configuration();
+ }
+
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ conf.set(entry.getKey(), entry.getValue());
+ }
+
+ // for compatibility
+ if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(TableProperties.ORC_WRITE_BATCH_SIZE) == null) {
+ config.put(TableProperties.ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
+ }
+
// Map Iceberg properties to pass down to the ORC writer
- Context context = createContextFunc.apply(conf);
- long stripeSize = context.stripeSize();
- long blockSize = context.blockSize();
+ Context context = createContextFunc.apply(config);
+ conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), context.stripeSize());
+ conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), context.blockSize());
- conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), stripeSize);
- conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), blockSize);
+ conf.setBoolean(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), overwrite);
return new OrcFileAppender<>(schema,
this.file, createWriterFunc, conf, metadata,
- conf.getInt(VECTOR_ROW_BATCH_SIZE, VectorizedRowBatch.DEFAULT_SIZE), metricsConfig);
+ context.vectorizedRowBatchSize(), metricsConfig);
}
private static class Context {
private final long stripeSize;
private final long blockSize;
+ private final int vectorizedRowBatchSize;
public long stripeSize() {
return stripeSize;
@@ -186,35 +203,47 @@ public class ORC {
return blockSize;
}
- private Context(long stripeSize, long blockSize) {
+ public int vectorizedRowBatchSize() {
+ return vectorizedRowBatchSize;
+ }
+
+ private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize) {
this.stripeSize = stripeSize;
this.blockSize = blockSize;
+ this.vectorizedRowBatchSize = vectorizedRowBatchSize;
}
- static Context dataContext(Configuration conf) {
- long stripeSize = TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT;
- stripeSize = conf.getLong(OrcConf.STRIPE_SIZE.getAttribute(), stripeSize);
- stripeSize = conf.getLong(OrcConf.STRIPE_SIZE.getHiveConfName(), stripeSize);
- stripeSize = conf.getLong(TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
+ static Context dataContext(Map<String, String> config) {
+ long stripeSize = PropertyUtil.propertyAsLong(config, OrcConf.STRIPE_SIZE.getAttribute(),
+ TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT);
+ stripeSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
+ Preconditions.checkArgument(stripeSize > 0, "Stripe size must be > 0");
- long blockSize = TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
- blockSize = conf.getLong(OrcConf.BLOCK_SIZE.getAttribute(), blockSize);
- blockSize = conf.getLong(OrcConf.BLOCK_SIZE.getHiveConfName(), blockSize);
- blockSize = conf.getLong(TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
+ long blockSize = PropertyUtil.propertyAsLong(config, OrcConf.BLOCK_SIZE.getAttribute(),
+ TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
+ blockSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
+ Preconditions.checkArgument(blockSize > 0, "Block size must be > 0");
- return new Context(stripeSize, blockSize);
+ int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
+ TableProperties.ORC_WRITE_BATCH_SIZE, TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT);
+ Preconditions.checkArgument(vectorizedRowBatchSize > 0, "VectorizedRow batch size must be > 0");
+
+ return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
}
- static Context deleteContext(Configuration conf) {
- Context dataContext = dataContext(conf);
+ static Context deleteContext(Map<String, String> config) {
+ Context dataContext = dataContext(config);
+
+ long stripeSize = PropertyUtil.propertyAsLong(config,
+ TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
- long stripeSize =
- conf.getLong(TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
+ long blockSize = PropertyUtil.propertyAsLong(config,
+ TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());
- long blockSize =
- conf.getLong(TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.stripeSize());
+ int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
+ TableProperties.DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());
- return new Context(stripeSize, blockSize);
+ return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
}
}
}