You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2021/12/01 08:05:25 UTC

[flink] 02/03: [FLINK-24596][core] Introduce SerializableFunction and unify usages

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b0f56390b5ae768851c58b811c3b0f3dec96f294
Author: Fabian Paul <fa...@ververica.com>
AuthorDate: Fri Nov 26 11:42:36 2021 +0100

    [FLINK-24596][core] Introduce SerializableFunction and unify usages
---
 .../streaming/connectors/elasticsearch/table/KeyExtractor.java     | 3 ++-
 .../org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java | 3 ++-
 .../java/org/apache/flink/util/function}/SerializableFunction.java | 2 +-
 .../runtime/operators/coordination/TestingOperatorCoordinator.java | 2 +-
 .../java/org/apache/flink/runtime/util/SerializableFunction.java   | 7 ++++++-
 .../flink/connector/datagen/table/types/DataGeneratorMapper.java   | 7 +------
 6 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
index ae7c522..ca9c297 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.DistinctType;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.SerializableFunction;
 
 import java.io.Serializable;
 import java.time.Duration;
@@ -37,7 +38,7 @@ import java.util.function.Function;
 
 /** An extractor for a Elasticsearch key from a {@link RowData}. */
 @Internal
-class KeyExtractor implements Function<RowData, String>, Serializable {
+class KeyExtractor implements SerializableFunction<RowData, String> {
     private final FieldFormatter[] fieldFormatters;
     private final String keyDelimiter;
 
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
index 121800e..e7b7514 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java
@@ -40,6 +40,7 @@ import org.apache.flink.connector.jdbc.utils.JdbcUtils;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.function.SerializableFunction;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -99,7 +100,7 @@ public class JdbcOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStatementExe
      * @param <T> The type of instance.
      */
     public interface StatementExecutorFactory<T extends JdbcBatchStatementExecutor<?>>
-            extends Function<RuntimeContext, T>, Serializable {}
+            extends SerializableFunction<RuntimeContext, T> {}
 
     private static final long serialVersionUID = 1L;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java b/flink-core/src/main/java/org/apache/flink/util/function/SerializableFunction.java
similarity index 96%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
copy to flink-core/src/main/java/org/apache/flink/util/function/SerializableFunction.java
index 75d7f91..2eee42e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/util/function/SerializableFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.util;
+package org.apache.flink.util.function;
 
 import org.apache.flink.annotation.Public;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 50b3046..15d0dc6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.operators.coordination;
 
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.util.SerializableFunction;
+import org.apache.flink.util.function.SerializableFunction;
 
 import javax.annotation.Nullable;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
index 75d7f91..00d2326 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializableFunction.java
@@ -23,7 +23,12 @@ import org.apache.flink.annotation.Public;
 import java.io.Serializable;
 import java.util.function.Function;
 
-/** A {@link Function} that is also {@link Serializable}. */
+/**
+ * A {@link Function} that is also {@link Serializable}.
+ *
+ * @deprecated Please use {@link org.apache.flink.util.function.SerializableFunction}
+ */
+@Deprecated
 @Public
 @FunctionalInterface
 public interface SerializableFunction<T, R> extends Function<T, R>, Serializable {}
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java
index 19fb9b4..4e1c61d 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/connector/datagen/table/types/DataGeneratorMapper.java
@@ -22,9 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
-
-import java.io.Serializable;
-import java.util.function.Function;
+import org.apache.flink.util.function.SerializableFunction;
 
 /** Utility for mapping the output of a {@link DataGenerator}. */
 @Internal
@@ -55,7 +53,4 @@ public class DataGeneratorMapper<A, B> implements DataGenerator<B> {
     public B next() {
         return mapper.apply(generator.next());
     }
-
-    /** A simple serializable function. */
-    public interface SerializableFunction<A, B> extends Function<A, B>, Serializable {}
 }