You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/07/28 04:53:11 UTC

[flink-table-store] branch release-0.2 updated: [FLINK-28700] Table store sink fails to commit for Flink 1.14 batch job

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.2
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.2 by this push:
     new 77230ddf [FLINK-28700] Table store sink fails to commit for Flink 1.14 batch job
77230ddf is described below

commit 77230ddf70f809c578ab54c8e2881be774da044c
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Jul 28 12:53:07 2022 +0800

    [FLINK-28700] Table store sink fails to commit for Flink 1.14 batch job
    
    This closes #244
---
 .../utils/StreamExecutionEnvironmentUtils.java     | 48 ++++++++++++++++++++++
 .../utils/StreamExecutionEnvironmentUtils.java     | 30 ++++++++++++++
 .../table/store/connector/sink/StoreSink.java      |  4 +-
 3 files changed, 81 insertions(+), 1 deletion(-)

diff --git a/flink-table-store-connector/src/main/1.14.5/org/apache/flink/table/store/connector/utils/StreamExecutionEnvironmentUtils.java b/flink-table-store-connector/src/main/1.14.5/org/apache/flink/table/store/connector/utils/StreamExecutionEnvironmentUtils.java
new file mode 100644
index 00000000..813f1b41
--- /dev/null
+++ b/flink-table-store-connector/src/main/1.14.5/org/apache/flink/table/store/connector/utils/StreamExecutionEnvironmentUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.utils;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.lang.reflect.Field;
+
+/** Utility methods for {@link StreamExecutionEnvironment}. */
+public class StreamExecutionEnvironmentUtils {
+
+    public static ReadableConfig getConfiguration(StreamExecutionEnvironment env) {
+        // For Flink 1.14 we have to use reflection to get configuration from execution environment.
+        // See FLINK-26709 for more info.
+        if (env.getClass()
+                .getName()
+                .equals("org.apache.flink.table.planner.utils.DummyStreamExecutionEnvironment")) {
+            try {
+                Field realExecEnvField = env.getClass().getDeclaredField("realExecEnv");
+                realExecEnvField.setAccessible(true);
+                env = (StreamExecutionEnvironment) realExecEnvField.get(env);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new RuntimeException(
+                        "Failed to get realExecEnv from DummyStreamExecutionEnvironment "
+                                + "by Java reflection. This is unexpected.",
+                        e);
+            }
+        }
+        return env.getConfiguration();
+    }
+}
diff --git a/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/utils/StreamExecutionEnvironmentUtils.java b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/utils/StreamExecutionEnvironmentUtils.java
new file mode 100644
index 00000000..2af52bae
--- /dev/null
+++ b/flink-table-store-connector/src/main/1.15.0/org/apache/flink/table/store/connector/utils/StreamExecutionEnvironmentUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.utils;
+
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+/** Utility methods for {@link StreamExecutionEnvironment}. */
+public class StreamExecutionEnvironmentUtils {
+
+    public static ReadableConfig getConfiguration(StreamExecutionEnvironment env) {
+        return env.getConfiguration();
+    }
+}
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index 82b8e4e2..8613d6e4 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.table.store.file.manifest.ManifestCommittableSerializer;
 import org.apache.flink.table.store.file.operation.Lock;
@@ -103,7 +104,8 @@ public class StoreSink implements Serializable {
 
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         boolean streamingCheckpointEnabled =
-                env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
+                StreamExecutionEnvironmentUtils.getConfiguration(env)
+                                        .get(ExecutionOptions.RUNTIME_MODE)
                                 == RuntimeExecutionMode.STREAMING
                         && env.getCheckpointConfig().isCheckpointingEnabled();
         SingleOutputStreamOperator<?> committed =