You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/14 18:41:20 UTC

[iceberg] branch master updated: Flink: Add engine name, version to EnvironmentContext (#6184)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new a9d36309ea Flink: Add engine name, version to EnvironmentContext (#6184)
a9d36309ea is described below

commit a9d36309ea2a51e7f4c5c4bf7461a7bdb0e5870d
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Mon Nov 14 19:41:15 2022 +0100

    Flink: Add engine name, version to EnvironmentContext (#6184)
---
 .../flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java    | 4 ++++
 .../flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java    | 4 ++++
 .../flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java    | 4 ++++
 3 files changed, 12 insertions(+)

diff --git a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 8779e4656d..248577c2af 100644
--- a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.util.StringUtils;
 import org.apache.iceberg.CachingCatalog;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.EnvironmentContext;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -113,6 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
     asNamespaceCatalog =
         originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
     closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+
+    EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+    EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, "1.14");
   }
 
   @Override
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 8779e4656d..327ec73002 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.util.StringUtils;
 import org.apache.iceberg.CachingCatalog;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.EnvironmentContext;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -113,6 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
     asNamespaceCatalog =
         originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
     closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+
+    EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+    EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, "1.15");
   }
 
   @Override
diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 8779e4656d..92dfbf7fb3 100644
--- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -53,6 +53,7 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.util.StringUtils;
 import org.apache.iceberg.CachingCatalog;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.EnvironmentContext;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionSpec;
@@ -113,6 +114,9 @@ public class FlinkCatalog extends AbstractCatalog {
     asNamespaceCatalog =
         originalCatalog instanceof SupportsNamespaces ? (SupportsNamespaces) originalCatalog : null;
     closeable = originalCatalog instanceof Closeable ? (Closeable) originalCatalog : null;
+
+    EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "flink");
+    EnvironmentContext.put(EnvironmentContext.ENGINE_VERSION, "1.16");
   }
 
   @Override