You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/07 09:07:48 UTC

[flink] branch master updated: [FLINK-25114][table-runtime] Remove flink-scala dependency and scala suffix

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d946302  [FLINK-25114][table-runtime] Remove flink-scala dependency and scala suffix
d946302 is described below

commit d9463022504a6bccad30d681c71f46658c073041
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Wed Dec 1 14:54:36 2021 +0100

    [FLINK-25114][table-runtime] Remove flink-scala dependency and scala suffix
    
    This closes #18011.
---
 flink-architecture-tests/pom.xml                   |  2 +-
 flink-connectors/flink-connector-hive/pom.xml      |  2 +-
 .../flink-avro-confluent-registry/pom.xml          |  2 +-
 flink-python/pom.xml                               |  4 +-
 flink-table/flink-sql-client/pom.xml               |  2 +-
 flink-table/flink-table-planner/pom.xml            |  4 +-
 flink-table/flink-table-runtime/pom.xml            | 10 +---
 .../table/data/util/DataFormatConverters.java      | 66 +++++++++++++++++++---
 flink-table/flink-table-uber/pom.xml               |  4 +-
 9 files changed, 68 insertions(+), 28 deletions(-)

diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml
index 23330f9..d981444 100644
--- a/flink-architecture-tests/pom.xml
+++ b/flink-architecture-tests/pom.xml
@@ -116,7 +116,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
diff --git a/flink-connectors/flink-connector-hive/pom.xml b/flink-connectors/flink-connector-hive/pom.xml
index 7b022ab..37f3dea 100644
--- a/flink-connectors/flink-connector-hive/pom.xml
+++ b/flink-connectors/flink-connector-hive/pom.xml
@@ -147,7 +147,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml
index 59fc1d1..6dda873 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -113,7 +113,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
diff --git a/flink-python/pom.xml b/flink-python/pom.xml
index 8007026..53d0acd 100644
--- a/flink-python/pom.xml
+++ b/flink-python/pom.xml
@@ -76,7 +76,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
 		</dependency>
@@ -190,7 +190,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml
index 6254ae9..44ec8ad 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -86,7 +86,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
diff --git a/flink-table/flink-table-planner/pom.xml b/flink-table/flink-table-planner/pom.xml
index 26912eb..c971579 100644
--- a/flink-table/flink-table-planner/pom.xml
+++ b/flink-table/flink-table-planner/pom.xml
@@ -113,7 +113,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 
@@ -269,7 +269,7 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 			<type>test-jar</type>
 			<scope>test</scope>
diff --git a/flink-table/flink-table-runtime/pom.xml b/flink-table/flink-table-runtime/pom.xml
index badc73a..a6a6f9a 100644
--- a/flink-table/flink-table-runtime/pom.xml
+++ b/flink-table/flink-table-runtime/pom.xml
@@ -27,7 +27,7 @@ under the License.
 		<relativePath>..</relativePath>
 	</parent>
 
-	<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+	<artifactId>flink-table-runtime</artifactId>
 	<name>Flink : Table : Runtime</name>
 	<description>
 		This module contains classes that are required by a task manager for
@@ -79,14 +79,6 @@ under the License.
 		</dependency>
 
 		<dependency>
-			<!-- Provides the kryo serializer -->
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-scala_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
 			<groupId>org.codehaus.janino</groupId>
 			<artifactId>janino</artifactId>
 			<version>${janino.version}</version>
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
index 4234183..3d9b3e0 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
@@ -70,8 +70,12 @@ import org.apache.flink.types.Row;
 
 import org.apache.commons.lang3.ArrayUtils;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.lang.reflect.Array;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Time;
@@ -86,8 +90,6 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
-import scala.Product;
-
 import static org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo;
 import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
 import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
@@ -291,10 +293,16 @@ public class DataFormatConverters {
                     return new RowConverter(fieldTypes);
                 } else if (Tuple.class.isAssignableFrom(clazz)) {
                     return new TupleConverter((Class<Tuple>) clazz, fieldTypes);
-                } else if (Product.class.isAssignableFrom(clazz)) {
+                } else if (CaseClassConverter.PRODUCT_CLASS != null
+                        && CaseClassConverter.PRODUCT_CLASS.isAssignableFrom(clazz)) {
                     return new CaseClassConverter((TupleTypeInfoBase) compositeType, fieldTypes);
-                } else {
+                } else if (compositeType instanceof PojoTypeInfo) {
                     return new PojoConverter((PojoTypeInfo) compositeType, fieldTypes);
+                } else {
+                    throw new IllegalStateException(
+                            "Cannot find a converter for type "
+                                    + compositeType
+                                    + ". If the target should be a converter to scala.Product, then you might have a scala classpath issue.");
                 }
             case RAW:
                 if (logicalType instanceof RawType) {
@@ -1501,10 +1509,13 @@ public class DataFormatConverters {
     }
 
     /** Converter for case class. */
-    public static final class CaseClassConverter extends AbstractRowDataConverter<Product> {
+    public static final class CaseClassConverter extends AbstractRowDataConverter<Object> {
 
         private static final long serialVersionUID = -966598627968372952L;
 
+        @Nullable private static final Class<?> PRODUCT_CLASS = getProductClass();
+        @Nullable private static final Method PRODUCT_ELEMENT_METHOD = getProductElementMethod();
+
         private final TupleTypeInfoBase t;
         private final TupleSerializerBase serializer;
 
@@ -1515,21 +1526,58 @@ public class DataFormatConverters {
         }
 
         @Override
-        RowData toInternalImpl(Product value) {
+        RowData toInternalImpl(Object value) {
             GenericRowData genericRow = new GenericRowData(t.getArity());
             for (int i = 0; i < t.getArity(); i++) {
-                genericRow.setField(i, converters[i].toInternal(value.productElement(i)));
+                genericRow.setField(i, converters[i].toInternal(invokeProductElement(value, i)));
             }
             return genericRow;
         }
 
         @Override
-        Product toExternalImpl(RowData value) {
+        Object toExternalImpl(RowData value) {
             Object[] fields = new Object[t.getArity()];
             for (int i = 0; i < t.getArity(); i++) {
                 fields[i] = converters[i].toExternal(value, i);
             }
-            return (Product) serializer.createInstance(fields);
+            return serializer.createInstance(fields);
+        }
+
+        private static Class<?> getProductClass() {
+            try {
+                return Class.forName(
+                        "scala.Product", false, Thread.currentThread().getContextClassLoader());
+            } catch (ClassNotFoundException e) {
+                // Ignore, no scala available in the classpath
+                return null;
+            }
+        }
+
+        private static Method getProductElementMethod() {
+            try {
+                if (PRODUCT_CLASS == null) {
+                    return null;
+                }
+                return PRODUCT_CLASS.getMethod("productElement", int.class);
+            } catch (NoSuchMethodException e) {
+                throw new IllegalStateException(
+                        "Cannot find scala.Product#productElement, has Scala changed its public API?",
+                        e);
+            }
+        }
+
+        private static Object invokeProductElement(Object value, int i) {
+            try {
+                if (PRODUCT_ELEMENT_METHOD == null) {
+                    throw new IllegalStateException(
+                            "PRODUCT_ELEMENT_METHOD is null, but it cannot be as this method should never be invoked if Scala is not in the classpath. Something is wrong with the classpath?");
+                }
+                return PRODUCT_ELEMENT_METHOD.invoke(value, i);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+                throw new IllegalStateException(
+                        "Cannot execute scala.Product#productElement, has Scala changed its public API?",
+                        e);
+            }
         }
     }
 
diff --git a/flink-table/flink-table-uber/pom.xml b/flink-table/flink-table-uber/pom.xml
index 9b985df..c51f8d2 100644
--- a/flink-table/flink-table-uber/pom.xml
+++ b/flink-table/flink-table-uber/pom.xml
@@ -80,7 +80,7 @@ under the License.
 		</dependency>
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-runtime_${scala.binary.version}</artifactId>
+			<artifactId>flink-table-runtime</artifactId>
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
@@ -113,7 +113,7 @@ under the License.
 									<include>org.apache.flink:flink-table-api-java-bridge</include>
 									<include>org.apache.flink:flink-table-api-scala-bridge_${scala.binary.version}</include>
 									<include>org.apache.flink:flink-table-planner_${scala.binary.version}</include>
-									<include>org.apache.flink:flink-table-runtime_${scala.binary.version}</include>
+									<include>org.apache.flink:flink-table-runtime</include>
 									<include>org.apache.flink:flink-cep</include>
 								</includes>
 							</artifactSet>