You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by sz...@apache.org on 2023/05/24 18:12:09 UTC

[iceberg] branch master updated: Core: Support set system level properties with environmental variables (#5659)

This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b8941e031 Core: Support set system level properties with environmental variables (#5659)
4b8941e031 is described below

commit 4b8941e03141573c310110092d2f3feebd745e4f
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Thu May 25 02:11:59 2023 +0800

    Core: Support set system level properties with environmental variables (#5659)
---
 .palantir/revapi.yml                               |   4 +
 .../src/main/java/org/apache/iceberg/BaseScan.java |   2 +-
 .../java/org/apache/iceberg/ManifestFiles.java     |  15 +--
 .../java/org/apache/iceberg/SystemConfigs.java     | 123 +++++++++++++++++++++
 .../java/org/apache/iceberg/SystemProperties.java  |   8 +-
 .../java/org/apache/iceberg/util/ThreadPools.java  |  25 ++---
 .../org/apache/iceberg/TestManifestCaching.java    |   2 +-
 7 files changed, 146 insertions(+), 33 deletions(-)

diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml
index bc16672074..ef039675ac 100644
--- a/.palantir/revapi.yml
+++ b/.palantir/revapi.yml
@@ -442,6 +442,10 @@ acceptedBreaks:
     - code: "java.class.removed"
       old: "class org.apache.iceberg.actions.BaseExpireSnapshotsActionResult"
       justification: "Removing deprecations for 1.3.0"
+    - code: "java.field.noLongerConstant"
+      old: "field org.apache.iceberg.util.ThreadPools.WORKER_THREAD_POOL_SIZE_PROP"
+      new: "field org.apache.iceberg.util.ThreadPools.WORKER_THREAD_POOL_SIZE_PROP"
+      justification: "{Not break, they are same}"
     - code: "java.field.removedWithConstant"
       old: "field org.apache.iceberg.CatalogProperties.AUTH_DEFAULT_REFRESH_ENABLED"
       justification: "Removing deprecations for 1.3.0"
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 4f605985cf..9db72227ac 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -79,7 +79,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
       ImmutableList.<String>builder().addAll(DELETE_SCAN_COLUMNS).addAll(STATS_COLUMNS).build();
 
   private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
-      SystemProperties.getBoolean(SystemProperties.SCAN_THREAD_POOL_ENABLED, true);
+      SystemConfigs.SCAN_THREAD_POOL_ENABLED.value();
 
   private final Table table;
   private final Schema schema;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 6cd9b5fa21..c23ab667a4 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -55,10 +55,11 @@ public class ManifestFiles {
 
   @VisibleForTesting
   static Caffeine<Object, Object> newManifestCacheBuilder() {
+    int maxSize = SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.value();
     return Caffeine.newBuilder()
         .weakKeys()
         .softValues()
-        .maximumSize(maxFileIO())
+        .maximumSize(maxSize)
         .removalListener(
             (io, contentCache, cause) ->
                 LOG.debug("Evicted {} from FileIO-level cache ({})", io, cause))
@@ -367,18 +368,6 @@ public class ManifestFiles {
     return io.newInputFile(path, length);
   }
 
-  private static int maxFileIO() {
-    String value = System.getProperty(SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO);
-    if (value != null) {
-      try {
-        return Integer.parseUnsignedInt(value);
-      } catch (NumberFormatException e) {
-        // will return the default
-      }
-    }
-    return SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT;
-  }
-
   static boolean cachingEnabled(FileIO io) {
     return PropertyUtil.propertyAsBoolean(
         io.properties(),
diff --git a/core/src/main/java/org/apache/iceberg/SystemConfigs.java b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
new file mode 100644
index 0000000000..74dff295d7
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/SystemConfigs.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configuration properties that are controlled by Java system properties or environmental variable.
+ */
+public class SystemConfigs {
+  private static final Logger LOG = LoggerFactory.getLogger(SystemConfigs.class);
+
+  private SystemConfigs() {}
+
+  /**
+   * Sets the size of the worker pool. The worker pool limits the number of tasks concurrently
+   * processing manifests in the base table implementation across all concurrent planning or commit
+   * operations.
+   */
+  public static final ConfigEntry<Integer> WORKER_THREAD_POOL_SIZE =
+      new ConfigEntry<>(
+          "iceberg.worker.num-threads",
+          "ICEBERG_WORKER_NUM_THREADS",
+          Math.max(2, Runtime.getRuntime().availableProcessors()),
+          Integer::parseUnsignedInt);
+
+  /** Whether to use the shared worker pool when planning table scans. */
+  public static final ConfigEntry<Boolean> SCAN_THREAD_POOL_ENABLED =
+      new ConfigEntry<>(
+          "iceberg.scan.plan-in-worker-pool",
+          "ICEBERG_SCAN_PLAN_IN_WORKER_POOL",
+          true,
+          Boolean::parseBoolean);
+
+  /**
+   * Maximum number of distinct {@link org.apache.iceberg.io.FileIO} that is allowed to have
+   * associated {@link org.apache.iceberg.io.ContentCache} in memory at a time.
+   */
+  public static final ConfigEntry<Integer> IO_MANIFEST_CACHE_MAX_FILEIO =
+      new ConfigEntry<>(
+          "iceberg.io.manifest.cache.fileio-max",
+          "ICEBERG_IO_MANIFEST_CACHE_FILEIO_MAX",
+          8,
+          Integer::parseUnsignedInt);
+
+  public static class ConfigEntry<T> {
+    private final String propertyKey;
+    private final String envKey;
+    private final T defaultValue;
+    private final Function<String, T> parseFunc;
+    private T lazyValue = null;
+
+    private ConfigEntry(
+        String propertyKey, String envKey, T defaultValue, Function<String, T> parseFunc) {
+      this.propertyKey = propertyKey;
+      this.envKey = envKey;
+      this.defaultValue = defaultValue;
+      this.parseFunc = parseFunc;
+    }
+
+    public final String propertyKey() {
+      return propertyKey;
+    }
+
+    public final String envKey() {
+      return envKey;
+    }
+
+    public final T defaultValue() {
+      return defaultValue;
+    }
+
+    public final T value() {
+      if (lazyValue == null) {
+        lazyValue = getValue();
+      }
+
+      return lazyValue;
+    }
+
+    private T getValue() {
+      String value = System.getProperty(propertyKey);
+      if (value == null) {
+        value = System.getenv(envKey);
+      }
+
+      if (value != null) {
+        try {
+          return parseFunc.apply(value);
+        } catch (Exception e) {
+          // will return the default value
+          LOG.error(
+              "Failed to parse the config value set by system property: {} or env variable: {}, "
+                  + "using the default value: {}",
+              propertyKey,
+              envKey,
+              defaultValue,
+              e);
+        }
+      }
+
+      return defaultValue;
+    }
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/SystemProperties.java b/core/src/main/java/org/apache/iceberg/SystemProperties.java
index 1d3c00b97c..7a83d530e2 100644
--- a/core/src/main/java/org/apache/iceberg/SystemProperties.java
+++ b/core/src/main/java/org/apache/iceberg/SystemProperties.java
@@ -18,7 +18,12 @@
  */
 package org.apache.iceberg;
 
-/** Configuration properties that are controlled by Java system properties. */
+/**
+ * Configuration properties that are controlled by Java system properties.
+ *
+ * @deprecated Use {@link SystemConfigs} instead; will be removed in 2.0.0
+ */
+@Deprecated
 public class SystemProperties {
 
   private SystemProperties() {}
@@ -46,6 +51,7 @@ public class SystemProperties {
     if (value != null) {
       return Boolean.parseBoolean(value);
     }
+
     return defaultValue;
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
index c4e0f31e21..a998e4262f 100644
--- a/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
+++ b/core/src/main/java/org/apache/iceberg/util/ThreadPools.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
-import org.apache.iceberg.SystemProperties;
+import org.apache.iceberg.SystemConfigs;
 import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -32,12 +32,15 @@ public class ThreadPools {
 
   private ThreadPools() {}
 
+  /**
+   * @deprecated Use {@link SystemConfigs#WORKER_THREAD_POOL_SIZE WORKER_THREAD_POOL_SIZE} instead;
+   *     will be removed in 2.0.0
+   */
+  @Deprecated
   public static final String WORKER_THREAD_POOL_SIZE_PROP =
-      SystemProperties.WORKER_THREAD_POOL_SIZE_PROP;
+      SystemConfigs.WORKER_THREAD_POOL_SIZE.propertyKey();
 
-  public static final int WORKER_THREAD_POOL_SIZE =
-      getPoolSize(
-          WORKER_THREAD_POOL_SIZE_PROP, Math.max(2, Runtime.getRuntime().availableProcessors()));
+  public static final int WORKER_THREAD_POOL_SIZE = SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
 
   private static final ExecutorService WORKER_POOL = newWorkerPool("iceberg-worker-pool");
 
@@ -79,18 +82,6 @@ public class ThreadPools {
     return new ScheduledThreadPoolExecutor(poolSize, newDaemonThreadFactory(namePrefix));
   }
 
-  private static int getPoolSize(String systemProperty, int defaultSize) {
-    String value = System.getProperty(systemProperty);
-    if (value != null) {
-      try {
-        return Integer.parseUnsignedInt(value);
-      } catch (NumberFormatException e) {
-        // will return the default
-      }
-    }
-    return defaultSize;
-  }
-
   private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
     return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d").build();
   }
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java
index 78c593d8eb..3b67cb3e69 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestCaching.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestCaching.java
@@ -169,7 +169,7 @@ public class TestManifestCaching {
   public void testWeakFileIOReferenceCleanUp() {
     Cache<FileIO, ContentCache> manifestCache =
         ManifestFiles.newManifestCacheBuilder().executor(Runnable::run).build();
-    int maxIO = SystemProperties.IO_MANIFEST_CACHE_MAX_FILEIO_DEFAULT;
+    int maxIO = SystemConfigs.IO_MANIFEST_CACHE_MAX_FILEIO.defaultValue();
     FileIO firstIO = null;
     ContentCache firstCache = null;
     for (int i = 0; i < maxIO - 1; i++) {