You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/05/23 08:01:28 UTC

[flink] 03/06: [hotfix][table-common] Fix equality of data types with same conversion class

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a71c200b470ffe7ae614870d5855d1e24673c08b
Author: Timo Walther <tw...@apache.org>
AuthorDate: Tue May 21 13:35:46 2019 +0200

    [hotfix][table-common] Fix equality of data types with same conversion class
---
 .../flink/table/types/CollectionDataType.java      | 26 +++++++++++++---------
 .../org/apache/flink/table/types/DataType.java     | 20 +++++++++++------
 .../org/apache/flink/table/types/DataTypeTest.java |  6 +++++
 3 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
index 17b7096..b50fea9 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/CollectionDataType.java
@@ -43,7 +43,7 @@ public final class CollectionDataType extends DataType {
 			LogicalType logicalType,
 			@Nullable Class<?> conversionClass,
 			DataType elementDataType) {
-		super(logicalType, conversionClass);
+		super(logicalType, ensureArrayConversionClass(logicalType, elementDataType, conversionClass));
 		this.elementDataType = Preconditions.checkNotNull(elementDataType, "Element data type must not be null.");
 	}
 
@@ -82,16 +82,6 @@ public final class CollectionDataType extends DataType {
 	}
 
 	@Override
-	public Class<?> getConversionClass() {
-		// arrays are a special case because their default conversion class depends on the
-		// conversion class of the element type
-		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && conversionClass == null) {
-			return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
-		}
-		return super.getConversionClass();
-	}
-
-	@Override
 	public <R> R accept(DataTypeVisitor<R> visitor) {
 		return visitor.visit(this);
 	}
@@ -115,4 +105,18 @@ public final class CollectionDataType extends DataType {
 	public int hashCode() {
 		return Objects.hash(super.hashCode(), elementDataType);
 	}
+
+	// --------------------------------------------------------------------------------------------
+
+	private static Class<?> ensureArrayConversionClass(
+			LogicalType logicalType,
+			DataType elementDataType,
+			@Nullable Class<?> clazz) {
+		// arrays are a special case because their default conversion class depends on the
+		// conversion class of the element type
+		if (logicalType.getTypeRoot() == LogicalTypeRoot.ARRAY && clazz == null) {
+			return Array.newInstance(elementDataType.getConversionClass(), 0).getClass();
+		}
+		return clazz;
+	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
index 303a052..6b783e44 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/DataType.java
@@ -52,13 +52,15 @@ import java.util.Objects;
 @PublicEvolving
 public abstract class DataType implements Serializable {
 
-	protected LogicalType logicalType;
+	protected final LogicalType logicalType;
 
-	protected @Nullable Class<?> conversionClass;
+	protected final Class<?> conversionClass;
 
 	DataType(LogicalType logicalType, @Nullable Class<?> conversionClass) {
 		this.logicalType = Preconditions.checkNotNull(logicalType, "Logical type must not be null.");
-		this.conversionClass = performEarlyClassValidation(logicalType, conversionClass);
+		this.conversionClass = performEarlyClassValidation(
+			logicalType,
+			ensureConversionClass(logicalType, conversionClass));
 	}
 
 	/**
@@ -79,9 +81,6 @@ public abstract class DataType implements Serializable {
 	 * @return the expected conversion class
 	 */
 	public Class<?> getConversionClass() {
-		if (conversionClass == null) {
-			return logicalType.getDefaultConversion();
-		}
 		return conversionClass;
 	}
 
@@ -133,7 +132,7 @@ public abstract class DataType implements Serializable {
 		}
 		DataType dataType = (DataType) o;
 		return logicalType.equals(dataType.logicalType) &&
-			Objects.equals(conversionClass, dataType.conversionClass);
+			conversionClass.equals(dataType.conversionClass);
 	}
 
 	@Override
@@ -162,4 +161,11 @@ public abstract class DataType implements Serializable {
 		}
 		return candidate;
 	}
+
+	private static Class<?> ensureConversionClass(LogicalType logicalType, @Nullable Class<?> clazz) {
+		if (clazz == null) {
+			return logicalType.getDefaultConversion();
+		}
+		return clazz;
+	}
 }
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
index 2475db8..88b5cfd 100644
--- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/DataTypeTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.types;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 
 import org.junit.Test;
@@ -128,4 +129,9 @@ public class DataTypeTest {
 	public void testInvalidOrderInterval() {
 		INTERVAL(MONTH(), YEAR(2));
 	}
+
+	@Test
+	public void testConversionEquality() {
+		assertEquals(DataTypes.VARCHAR(2).bridgedTo(String.class), DataTypes.VARCHAR(2));
+	}
 }