You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2022/11/15 16:47:47 UTC
[iceberg] branch master updated: Flink: Add 'cache.expiration-interval-ms' option to FlinkCatalog (#6111)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 5195d280da Flink: Add 'cache.expiration-interval-ms' option to FlinkCatalog (#6111)
5195d280da is described below
commit 5195d280da11dd5a250d551b595eaca462ebb173
Author: Kunni <ku...@dtstack.com>
AuthorDate: Wed Nov 16 00:47:41 2022 +0800
Flink: Add 'cache.expiration-interval-ms' option to FlinkCatalog (#6111)
---
docs/flink-getting-started.md | 3 ++-
.../org/apache/iceberg/flink/FlinkCatalog.java | 8 +++++--
.../apache/iceberg/flink/FlinkCatalogFactory.java | 25 +++++++++++++++++++---
.../flink/TestFlinkCatalogTablePartitions.java | 5 ++---
.../org/apache/iceberg/flink/FlinkCatalog.java | 8 +++++--
.../apache/iceberg/flink/FlinkCatalogFactory.java | 25 +++++++++++++++++++---
.../flink/TestFlinkCatalogTablePartitions.java | 5 ++---
.../org/apache/iceberg/flink/FlinkCatalog.java | 8 +++++--
.../apache/iceberg/flink/FlinkCatalogFactory.java | 25 +++++++++++++++++++---
.../flink/TestFlinkCatalogTablePartitions.java | 5 ++---
10 files changed, 92 insertions(+), 25 deletions(-)
diff --git a/docs/flink-getting-started.md b/docs/flink-getting-started.md
index 3bf9e3e5ea..b01252beae 100644
--- a/docs/flink-getting-started.md
+++ b/docs/flink-getting-started.md
@@ -215,7 +215,8 @@ The following properties can be set globally and are not limited to a specific c
* `catalog-type`: `hive` or `hadoop` for built-in catalogs, or left unset for custom catalog implementations using catalog-impl. (Optional)
* `catalog-impl`: The fully-qualified class name of a custom catalog implementation. Must be set if `catalog-type` is unset. (Optional)
* `property-version`: Version number to describe the property version. This property can be used for backwards compatibility in case the property format changes. The current property version is `1`. (Optional)
-* `cache-enabled`: Whether to enable catalog cache, default value is `true`
+* `cache-enabled`: Whether to enable catalog cache, default value is `true`. (Optional)
+* `cache.expiration-interval-ms`: How long catalog entries are locally cached, in milliseconds; negative values like `-1` will disable expiration, value 0 is not allowed to set. default value is `-1`. (Optional)
### Hive catalog
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 248577c2af..718233dfec 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -103,14 +103,18 @@ public class FlinkCatalog extends AbstractCatalog {
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
- boolean cacheEnabled) {
+ boolean cacheEnabled,
+ long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;
Catalog originalCatalog = catalogLoader.loadCatalog();
- icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+ icebergCatalog =
+ cacheEnabled
+ ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
+ : originalCatalog;
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 19eda210ae..ff1c0a0591 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
/**
* A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
@@ -69,7 +70,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String DEFAULT_DATABASE = "default-database";
public static final String DEFAULT_DATABASE_NAME = "default";
public static final String BASE_NAMESPACE = "base-namespace";
- public static final String CACHE_ENABLED = "cache-enabled";
public static final String TYPE = "type";
public static final String PROPERTY_VERSION = "property-version";
@@ -145,8 +145,27 @@ public class FlinkCatalogFactory implements CatalogFactory {
baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
}
- boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
- return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+ boolean cacheEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+ long cacheExpirationIntervalMs =
+ PropertyUtil.propertyAsLong(
+ properties,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+ Preconditions.checkArgument(
+ cacheExpirationIntervalMs != 0,
+ "%s is not allowed to be 0.",
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+ return new FlinkCatalog(
+ name,
+ defaultDatabase,
+ baseNamespace,
+ catalogLoader,
+ cacheEnabled,
+ cacheExpirationIntervalMs);
}
private static Configuration mergeHiveConf(
diff --git a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 839700f501..e5332e8f30 100644
--- a/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.14/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,14 +18,13 @@
*/
package org.apache.iceberg.flink;
-import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
-
import java.util.List;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -63,7 +62,7 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) {
super(catalogName, baseNamespace);
this.format = format;
- config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+ config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
}
@Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 327ec73002..b29951a81c 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -103,14 +103,18 @@ public class FlinkCatalog extends AbstractCatalog {
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
- boolean cacheEnabled) {
+ boolean cacheEnabled,
+ long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;
Catalog originalCatalog = catalogLoader.loadCatalog();
- icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+ icebergCatalog =
+ cacheEnabled
+ ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
+ : originalCatalog;
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 19eda210ae..ff1c0a0591 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
/**
* A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
@@ -69,7 +70,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String DEFAULT_DATABASE = "default-database";
public static final String DEFAULT_DATABASE_NAME = "default";
public static final String BASE_NAMESPACE = "base-namespace";
- public static final String CACHE_ENABLED = "cache-enabled";
public static final String TYPE = "type";
public static final String PROPERTY_VERSION = "property-version";
@@ -145,8 +145,27 @@ public class FlinkCatalogFactory implements CatalogFactory {
baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
}
- boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
- return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+ boolean cacheEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+ long cacheExpirationIntervalMs =
+ PropertyUtil.propertyAsLong(
+ properties,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+ Preconditions.checkArgument(
+ cacheExpirationIntervalMs != 0,
+ "%s is not allowed to be 0.",
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+ return new FlinkCatalog(
+ name,
+ defaultDatabase,
+ baseNamespace,
+ catalogLoader,
+ cacheEnabled,
+ cacheExpirationIntervalMs);
}
private static Configuration mergeHiveConf(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 839700f501..e5332e8f30 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,14 +18,13 @@
*/
package org.apache.iceberg.flink;
-import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
-
import java.util.List;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -63,7 +62,7 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) {
super(catalogName, baseNamespace);
this.format = format;
- config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+ config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
}
@Override
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 92dfbf7fb3..a9c2a2e761 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -103,14 +103,18 @@ public class FlinkCatalog extends AbstractCatalog {
String defaultDatabase,
Namespace baseNamespace,
CatalogLoader catalogLoader,
- boolean cacheEnabled) {
+ boolean cacheEnabled,
+ long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;
Catalog originalCatalog = catalogLoader.loadCatalog();
- icebergCatalog = cacheEnabled ? CachingCatalog.wrap(originalCatalog) : originalCatalog;
+ icebergCatalog =
+ cacheEnabled
+ ? CachingCatalog.wrap(originalCatalog, cacheExpirationIntervalMs)
+ : originalCatalog;
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
index 19eda210ae..ff1c0a0591 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalogFactory.java
@@ -36,6 +36,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.util.PropertyUtil;
/**
* A Flink Catalog factory implementation that creates {@link FlinkCatalog}.
@@ -69,7 +70,6 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String DEFAULT_DATABASE = "default-database";
public static final String DEFAULT_DATABASE_NAME = "default";
public static final String BASE_NAMESPACE = "base-namespace";
- public static final String CACHE_ENABLED = "cache-enabled";
public static final String TYPE = "type";
public static final String PROPERTY_VERSION = "property-version";
@@ -145,8 +145,27 @@ public class FlinkCatalogFactory implements CatalogFactory {
baseNamespace = Namespace.of(properties.get(BASE_NAMESPACE).split("\\."));
}
- boolean cacheEnabled = Boolean.parseBoolean(properties.getOrDefault(CACHE_ENABLED, "true"));
- return new FlinkCatalog(name, defaultDatabase, baseNamespace, catalogLoader, cacheEnabled);
+ boolean cacheEnabled =
+ PropertyUtil.propertyAsBoolean(
+ properties, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
+
+ long cacheExpirationIntervalMs =
+ PropertyUtil.propertyAsLong(
+ properties,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_OFF);
+ Preconditions.checkArgument(
+ cacheExpirationIntervalMs != 0,
+ "%s is not allowed to be 0.",
+ CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS);
+
+ return new FlinkCatalog(
+ name,
+ defaultDatabase,
+ baseNamespace,
+ catalogLoader,
+ cacheEnabled,
+ cacheExpirationIntervalMs);
}
private static Configuration mergeHiveConf(
diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
index 839700f501..e5332e8f30 100644
--- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
+++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTablePartitions.java
@@ -18,14 +18,13 @@
*/
package org.apache.iceberg.flink;
-import static org.apache.iceberg.flink.FlinkCatalogFactory.CACHE_ENABLED;
-
import java.util.List;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -63,7 +62,7 @@ public class TestFlinkCatalogTablePartitions extends FlinkCatalogTestBase {
String catalogName, Namespace baseNamespace, FileFormat format, boolean cacheEnabled) {
super(catalogName, baseNamespace);
this.format = format;
- config.put(CACHE_ENABLED, String.valueOf(cacheEnabled));
+ config.put(CatalogProperties.CACHE_ENABLED, String.valueOf(cacheEnabled));
}
@Override