You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/14 18:41:20 UTC
[iceberg] branch master updated: Flink: Add engine name, version to EnvironmentContext (#6184)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new a9d36309ea Flink: Add engine name, version to EnvironmentContext (#6184)
a9d36309ea is described below
commit a9d36309ea2a51e7f4c5c4bf7461a7bdb0e5870d
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon Nov 14 19:41:15 2022 +0100
Flink: Add engine name, version to EnvironmentContext (#6184)
---
.../flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++++
.../flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++++
.../flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java | 4 ++++
3 files changed, 12 insertions(+)
diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 8779e4656d..248577c2af 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -113,6 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+
+ EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+ EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, "1.14");
}
@Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 8779e4656d..327ec73002 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -113,6 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+
+ EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+ EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, "1.15");
}
@Override
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 8779e4656d..92dfbf7fb3 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.StringUtils;
import org.apache.iceberg.CachingCatalog;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.EnvironmentContext;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
@@ -113,6 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
asNamespaceCatalog =
originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+
+ EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+ EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, "1.16");
}
@Override