You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/24 18:12:09 UTC
[iceberg] branch master updated: Core: Support set system level properties with environmental variables (#5659)
This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4b8941e031 Core: Support set system level properties with environmental variables (#5659)
4b8941e031 is described below
commit 4b8941e03141573c310110092d2f3feebd745e4f
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Thu May 25 02:11:59 2023 +0800
Core: Support set system level properties with environmental variables (#5659)
---
.palantir/revapi.yml | 4 +
.../src/main/java/org/apache/iceberg/BaseScan.java | 2 +-
.../java/org/apache/iceberg/ManifestFiles.java | 15 +--
.../java/org/apache/iceberg/SystemConfigs.java | 123 +++++++++++++++++++++
.../java/org/apache/iceberg/SystemProperties.java | 8 +-
.../java/org/apache/iceberg/util/ThreadPools.java | 25 ++---
.../org/apache/iceberg/TestManifestCaching.java | 2 +-
7 files changed, 146 insertions(+), 33 deletions(-)
diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index bc16672074..ef039675ac 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -442,6 +442,10 @@ acceptedBreaks:
- code: "java.class.removed"
old: "class org.apache.iceberg.actions.BaseExpireSnapshotsActionResult"
justification: "Removing deprecations for 1.3.0"
+ - code: "java.field.noLongerConstant"
+ old: "field org.apache.iceberg.util.ThreadPools.WORKER_THREAD_POOL_SIZE_PROP"
+ new: "field org.apache.iceberg.util.ThreadPools.WORKER_THREAD_POOL_SIZE_PROP"
+ justification: "{Not break, they are same}"
- code: "java.field.removedWithConstant"
old: "field org.apache.iceberg.CatalogProperties.AUTH_DEFAULT_REFRESH_ENABLED"
justification: "Removing deprecations for 1.3.0"
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 4f605985cf..9db72227ac 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -79,7 +79,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
ImmutableList.<String>builder().addAll(DELETE_SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
- SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
+ SystemConfigs.SCAN_THREAD_POOL_ENABLED.value();
private final Table table;
private final Schema schema;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 6cd9b5fa21..c23ab667a4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -55,10 +55,11 @@ public class ManifestFiles {
@VisibleForTesting
static Caffeine<Object, Object> newManifestCacheBuilder() {
+ int maxSize = SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.value();
return Caffeine.newBuilder()
.weakKeys()
.softValues()
- .maximumSize(maxFileIO())
+ .maximumSize(maxSize)
.removalListener(
(io, contentCache, cause) ->
LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause))
@@ -367,18 +368,6 @@ public class ManifestFiles {
return io.newInputFile(path, length);
}
- private static int maxFileIO() {
- String value = System.getProperty(SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO);
- if (value != null) {
- try {
- return Integer.parseUnsignedInt(value);
- } catch (NumberFormatException e) {
- // will return the default
- }
- }
- return SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT;
- }
-
static boolean cachingEnabled(FileIO io) {
return PropertyUtil.propertyAsBoolean(
io.properties(),
diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
new file mode 100644
index 0000000000..74dff295d7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configuration properties that are controlled by Java system properties or environmental variable.
+ */
+public class SystemConfigs {
+ private static final Logger LOG = LoggerFactory.getLogger(SystemConfigs.class);
+
+ private SystemConfigs() {}
+
+ /**
+ * Sets the size of the worker pool. The worker pool limits the number of tasks concurrently
+ * processing manifests in the base table implementation across all concurrent planning or commit
+ * operations.
+ */
+ public static final ConfigEntry<Integer> WORKER_THREAD_POOL_SIZE =
+ new ConfigEntry<>(
+ "iceberg.worker.num-threads",
+ "ICEBERG_WORKER_NUM_THREADS",
+ Math.max(2, Runtime.getRuntime().availableProcessors()),
+ Integer::parseUnsignedInt);
+
+ /** Whether to use the shared worker pool when planning table scans. */
+ public static final ConfigEntry<Boolean> SCAN_THREAD_POOL_ENABLED =
+ new ConfigEntry<>(
+ "iceberg.scan.plan-in-worker-pool",
+ "ICEBERG_SCAN_PLAN_IN_WORKER_POOL",
+ true,
+ Boolean::parseBoolean);
+
+ /**
+ * Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have
+ * associated {@link org.apache.iceberg.io.ContentCache} in memory at a time.
+ */
+ public static final ConfigEntry<Integer> IO_MANIFEST_CACHE_MAX_FILEIO =
+ new ConfigEntry<>(
+ "iceberg.io.manifest.cache.fileio-max",
+ "ICEBERG_IO_MANIFEST_CACHE_FILEIO_MAX",
+ 8,
+ Integer::parseUnsignedInt);
+
+ public static class ConfigEntry<T> {
+ private final String propertyKey;
+ private final String envKey;
+ private final T defaultValue;
+ private final Function<String, T> parseFunc;
+ private T lazyValue = null;
+
+ private ConfigEntry(
+ String propertyKey, String envKey, T defaultValue, Function<String, T> parseFunc) {
+ this.propertyKey = propertyKey;
+ this.envKey = envKey;
+ this.defaultValue = defaultValue;
+ this.parseFunc = parseFunc;
+ }
+
+ public final String propertyKey() {
+ return propertyKey;
+ }
+
+ public final String envKey() {
+ return envKey;
+ }
+
+ public final T defaultValue() {
+ return defaultValue;
+ }
+
+ public final T value() {
+ if (lazyValue == null) {
+ lazyValue = getValue();
+ }
+
+ return lazyValue;
+ }
+
+ private T getValue() {
+ String value = System.getProperty(propertyKey);
+ if (value == null) {
+ value = System.getenv(envKey);
+ }
+
+ if (value != null) {
+ try {
+ return parseFunc.apply(value);
+ } catch (Exception e) {
+ // will return the default value
+ LOG.error(
+ "Failed to parse the config value set by system property: {} or env variable: {}, "
+ + "using the default value: {}",
+ propertyKey,
+ envKey,
+ defaultValue,
+ e);
+ }
+ }
+
+ return defaultValue;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SystemProperties.java b/core/src/main/java/org/apache/iceberg/SystemProperties.java
index 1d3c00b97c..7a83d530e2 100644
--- a/core/src/main/java/org/apache/iceberg/SystemProperties.java
+++ b/core/src/main/java/org/apache/iceberg/SystemProperties.java
@@ -18,7 +18,12 @@
*/
package org.apache.iceberg;
-/** Configuration properties that are controlled by Java system properties. */
+/**
+ * Configuration properties that are controlled by Java system properties.
+ *
+ * @deprecated Use {@link SystemConfigs} instead; will be removed in 2.0.0
+ */
+@Deprecated
public class SystemProperties {
private SystemProperties() {}
@@ -46,6 +51,7 @@ public class SystemProperties {
if (value != null) {
return Boolean.parseBoolean(value);
}
+
return defaultValue;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index c4e0f31e21..a998e4262f 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.iceberg.SystemProperties;
+import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -32,12 +32,15 @@ public class ThreadPools {
private ThreadPools() {}
+ /**
+ * @deprecated Use {@link SystemConfigs#WORKER_THREAD_POOL_SIZE WORKER_THREAD_POOL_SIZE} instead;
+ * will be removed in 2.0.0
+ */
+ @Deprecated
public static final String WORKER_THREAD_POOL_SIZE_PROP =
- SystemProperties.WORKER_THREAD_POOL_SIZE_PROP;
+ SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey();
- public static final int WORKER_THREAD_POOL_SIZE =
- getPoolSize(
- WORKER_THREAD_POOL_SIZE_PROP, Math.max(2, Runtime.getRuntime().availableProcessors()));
+ public static final int WORKER_THREAD_POOL_SIZE = SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
private static final ExecutorService WORKER_POOL = newWorkerPool("iceberg-worker-pool");
@@ -79,18 +82,6 @@ public class ThreadPools {
return new ScheduledThreadPoolExecutor(poolSize, newDaemonThreadFactory(namePrefix));
}
- private static int getPoolSize(String systemProperty, int defaultSize) {
- String value = System.getProperty(systemProperty);
- if (value != null) {
- try {
- return Integer.parseUnsignedInt(value);
- } catch (NumberFormatException e) {
- // will return the default
- }
- }
- return defaultSize;
- }
-
private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d").build();
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java
index 78c593d8eb..3b67cb3e69 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java
@@ -169,7 +169,7 @@ public class TestManifestCaching {
public void testWeakFileIOReferenceCleanUp() {
Cache<FileIO, ContentCache> manifestCache =
ManifestFiles.newManifestCacheBuilder().executor(Runnable::run).build();
- int maxIO = SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT;
+ int maxIO = SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.defaultValue();
FileIO firstIO = null;
ContentCache firstCache = null;
for (int i = 0; i < maxIO - 1; i++) {