You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/11/15 16:47:47 UTC

[iceberg] branch master updated: Flink: Add 'cache.expiration-interval-ms' option to FlinkCatalog (#6111)

This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 5195d280da Flink: Add 'cache.expiration-interval-ms' option to FlinkCatalog (#6111)
5195d280da is described below

commit 5195d280da11dd5a250d551b595eaca462ebb173
Author: Kunni <ku...@dtstack.com>
AuthorDate: Wed Nov 16 00:47:41 2022 +0800

    Flink: Add 'cache.expiration-interval-ms' option to FlinkCatalog (#6111)
---
 docs/flink-getting-started.md                      |  3 ++-
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  8 +++++--
 .../apache/iceberg/flink/FlinkCatalogFactory.java  | 25 +++++++++++++++++++---
 .../flink/TestFlinkCatalogTablePartitions.java     |  5 ++---
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  8 +++++--
 .../apache/iceberg/flink/FlinkCatalogFactory.java  | 25 +++++++++++++++++++---
 .../flink/TestFlinkCatalogTablePartitions.java     |  5 ++---
 .../org/apache/iceberg/flink/FlinkCatalog.java     |  8 +++++--
 .../apache/iceberg/flink/FlinkCatalogFactory.java  | 25 +++++++++++++++++++---
 .../flink/TestFlinkCatalogTablePartitions.java     |  5 ++---
 10 files changed, 92 insertions(+), 25 deletions(-)

diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 3bf9e3e5ea..b01252beae 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -215,7 +215,8 @@ The following properties can be set globally and are not limited to a specific c
 * `catalog-type`: `hive` or `hadoop` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
 * `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. (Optional)
 * `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. (Optional)
-* `cache-enabled`: Whether to enable catalog cache, default value is `true`
+* `cache-enabled`: Whether to enable catalog cache, default value is `true`. (Optional)
+* `cache.expiration-interval-ms`: How long catalog entries are locally cached, in milliseconds; negative values like `-1` will disable expiration, value 0 is not allowed to set. default value is `-1`. (Optional)
 
 ### Hive catalog
 
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 248577c2af..718233dfec 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -103,14 +103,18 @@ public class FlinkCatalog extends AbstractCatalog {
       String defaultDatabase,
       Namespace baseNamespace,
       CatalogLoader catalogLoader,
-      boolean cacheEnabled) {
+      boolean cacheEnabled,
+      long cacheExpirationIntervalMs) {
     super(catalogName, defaultDatabase);
     this.catalogLoader = catalogLoader;
     this.baseNamespace = baseNamespace;
     this.cacheEnabled = cacheEnabled;
 
     Catalog originalCatalog = catalogLoader.loadCatalog();
-    icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+    icebergCatalog =
+        cacheEnabled
+            ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
+            : originalCatalog;
     asNamespaceCatalog =
         originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
     closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 19eda210ae..ff1c0a0591 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
 
 /**
  * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
@@ -69,7 +70,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
   public static final String BASE_NAMESPACE = "base-namespace";
-  public static final String CACHE_ENABLED = "cache-enabled";
 
   public static final String TYPE = "type";
   public static final String PROPERTY_VERSION = "property-version";
@@ -145,8 +145,27 @@ public class FlinkCatalogFactory implements CatalogFactory {
       baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
     }
 
-    boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
-    return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+    boolean cacheEnabled =
+        PropertyUtil.propertyAsBoolean(
+            properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+    long cacheExpirationIntervalMs =
+        PropertyUtil.propertyAsLong(
+            properties,
+            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+    Preconditions.checkArgument(
+        cacheExpirationIntervalMs != 0,
+        "%s is not allowed to be 0.",
+        CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+    return new FlinkCatalog(
+        name,
+        defaultDatabase,
+        baseNamespace,
+        catalogLoader,
+        cacheEnabled,
+        cacheExpirationIntervalMs);
   }
 
   private static Configuration mergeHiveConf(
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 839700f501..e5332e8f30 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,14 +18,13 @@
  */
 package org.apache.iceberg.flink;
 
-import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
-
 import java.util.List;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -63,7 +62,7 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
       String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) {
     super(catalogName, baseNamespace);
     this.format = format;
-    config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
   }
 
   @Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 327ec73002..b29951a81c 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -103,14 +103,18 @@ public class FlinkCatalog extends AbstractCatalog {
       String defaultDatabase,
       Namespace baseNamespace,
       CatalogLoader catalogLoader,
-      boolean cacheEnabled) {
+      boolean cacheEnabled,
+      long cacheExpirationIntervalMs) {
     super(catalogName, defaultDatabase);
     this.catalogLoader = catalogLoader;
     this.baseNamespace = baseNamespace;
     this.cacheEnabled = cacheEnabled;
 
     Catalog originalCatalog = catalogLoader.loadCatalog();
-    icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+    icebergCatalog =
+        cacheEnabled
+            ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
+            : originalCatalog;
     asNamespaceCatalog =
         originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
     closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 19eda210ae..ff1c0a0591 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
 
 /**
  * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
@@ -69,7 +70,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
   public static final String BASE_NAMESPACE = "base-namespace";
-  public static final String CACHE_ENABLED = "cache-enabled";
 
   public static final String TYPE = "type";
   public static final String PROPERTY_VERSION = "property-version";
@@ -145,8 +145,27 @@ public class FlinkCatalogFactory implements CatalogFactory {
       baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
     }
 
-    boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
-    return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+    boolean cacheEnabled =
+        PropertyUtil.propertyAsBoolean(
+            properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+    long cacheExpirationIntervalMs =
+        PropertyUtil.propertyAsLong(
+            properties,
+            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+    Preconditions.checkArgument(
+        cacheExpirationIntervalMs != 0,
+        "%s is not allowed to be 0.",
+        CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+    return new FlinkCatalog(
+        name,
+        defaultDatabase,
+        baseNamespace,
+        catalogLoader,
+        cacheEnabled,
+        cacheExpirationIntervalMs);
   }
 
   private static Configuration mergeHiveConf(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 839700f501..e5332e8f30 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,14 +18,13 @@
  */
 package org.apache.iceberg.flink;
 
-import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
-
 import java.util.List;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -63,7 +62,7 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
       String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) {
     super(catalogName, baseNamespace);
     this.format = format;
-    config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
   }
 
   @Override
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 92dfbf7fb3..a9c2a2e761 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -103,14 +103,18 @@ public class FlinkCatalog extends AbstractCatalog {
       String defaultDatabase,
       Namespace baseNamespace,
       CatalogLoader catalogLoader,
-      boolean cacheEnabled) {
+      boolean cacheEnabled,
+      long cacheExpirationIntervalMs) {
     super(catalogName, defaultDatabase);
     this.catalogLoader = catalogLoader;
     this.baseNamespace = baseNamespace;
     this.cacheEnabled = cacheEnabled;
 
     Catalog originalCatalog = catalogLoader.loadCatalog();
-    icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+    icebergCatalog =
+        cacheEnabled
+            ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
+            : originalCatalog;
     asNamespaceCatalog =
         originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
     closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 19eda210ae..ff1c0a0591 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Strings;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
 
 /**
  * A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
@@ -69,7 +70,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
   public static final String DEFAULT_DATABASE = "default-database";
   public static final String DEFAULT_DATABASE_NAME = "default";
   public static final String BASE_NAMESPACE = "base-namespace";
-  public static final String CACHE_ENABLED = "cache-enabled";
 
   public static final String TYPE = "type";
   public static final String PROPERTY_VERSION = "property-version";
@@ -145,8 +145,27 @@ public class FlinkCatalogFactory implements CatalogFactory {
       baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
     }
 
-    boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
-    return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+    boolean cacheEnabled =
+        PropertyUtil.propertyAsBoolean(
+            properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+    long cacheExpirationIntervalMs =
+        PropertyUtil.propertyAsLong(
+            properties,
+            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+            CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+    Preconditions.checkArgument(
+        cacheExpirationIntervalMs != 0,
+        "%s is not allowed to be 0.",
+        CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+    return new FlinkCatalog(
+        name,
+        defaultDatabase,
+        baseNamespace,
+        catalogLoader,
+        cacheEnabled,
+        cacheExpirationIntervalMs);
   }
 
   private static Configuration mergeHiveConf(
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 839700f501..e5332e8f30 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,14 +18,13 @@
  */
 package org.apache.iceberg.flink;
 
-import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
-
 import java.util.List;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -63,7 +62,7 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
       String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) {
     super(catalogName, baseNamespace);
     this.format = format;
-    config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+    config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
   }
 
   @Override