You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:26 UTC

[flink] 02/07: [hotfix] Extract joda conversions to a separate class

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09616cba088da907f6d279f959ea8d42e0434e9b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:23:29 2020 +0200

    [hotfix] Extract joda conversions to a separate class
---
 .../formats/avro/AvroRowDeserializationSchema.java | 53 +++++++++++------
 .../formats/avro/AvroToRowDataConverters.java      | 38 ++++++------
 .../apache/flink/formats/avro/JodaConverter.java   | 67 ++++++++++++++++++++++
 3 files changed, 124 insertions(+), 34 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 5e3edf7..fb7a74e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -42,10 +42,8 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeFieldType;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -124,6 +122,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 	private transient Decoder decoder;
 
 	/**
+	 * Converter for joda classes.
+	 */
+	private transient @Nullable JodaConverter jodaConverter;
+
+	/**
 	 * Creates a Avro deserialization schema for the given specific record class. Having the
 	 * concrete Avro record class might improve performance.
 	 *
@@ -139,6 +142,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		datumReader = new SpecificDatumReader<>(schema);
 		inputStream = new MutableByteArrayInputStream();
 		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+		jodaConverter = JodaConverter.getConverter();
 	}
 
 	/**
@@ -158,6 +162,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		datumReader = new GenericDatumReader<>(schema);
 		inputStream = new MutableByteArrayInputStream();
 		decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+		jodaConverter = JodaConverter.getConverter();
 	}
 
 	@Override
@@ -196,7 +201,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 
 	// --------------------------------------------------------------------------------------------
 
-	private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, IndexedRecord record) {
+	private Row convertAvroRecordToRow(
+			Schema schema,
+			RowTypeInfo typeInfo,
+			IndexedRecord record) {
 		final List<Schema.Field> fields = schema.getFields();
 		final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes();
 		final int length = fields.size();
@@ -208,7 +216,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		return row;
 	}
 
-	private Object convertAvroType(Schema schema, TypeInformation<?> info, Object object) {
+	private Object convertAvroType(
+			Schema schema,
+			TypeInformation<?> info,
+			Object object) {
 		// we perform the conversion based on schema information but enriched with pre-computed
 		// type information where useful (i.e., for arrays)
 
@@ -239,7 +250,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 				for (Map.Entry<?, ?> entry : map.entrySet()) {
 					convertedMap.put(
 						entry.getKey().toString(),
-						convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue()));
+						convertAvroType(
+							schema.getValueType(),
+							mapTypeInfo.getValueTypeInfo(),
+							entry.getValue())
+					);
 				}
 				return convertedMap;
 			case UNION:
@@ -302,10 +317,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 			// adopted from Apache Calcite
 			final long t = (long) value * 86400000L;
 			millis = t - (long) LOCAL_TZ.getOffset(t);
+		} else if (jodaConverter != null) {
+			millis = jodaConverter.convertDate(object);
 		} else {
-			// use 'provided' Joda time
-			final LocalDate value = (LocalDate) object;
-			millis = value.toDate().getTime();
+			throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
 		}
 		return new Date(millis);
 	}
@@ -314,10 +329,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		final long millis;
 		if (object instanceof Integer) {
 			millis = (Integer) object;
+		} else if (jodaConverter != null) {
+			millis = jodaConverter.convertTime(object);
 		} else {
-			// use 'provided' Joda time
-			final LocalTime value = (LocalTime) object;
-			millis = (long) value.get(DateTimeFieldType.millisOfDay());
+			throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
 		}
 		return new Time(millis - LOCAL_TZ.getOffset(millis));
 	}
@@ -326,15 +341,18 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		final long millis;
 		if (object instanceof Long) {
 			millis = (Long) object;
+		} else if (jodaConverter != null) {
+			millis = jodaConverter.convertTimestamp(object);
 		} else {
-			// use 'provided' Joda time
-			final DateTime value = (DateTime) object;
-			millis = value.toDate().getTime();
+			throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
 		}
 		return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
 	}
 
-	private Object[] convertToObjectArray(Schema elementSchema, TypeInformation<?> elementInfo, Object object) {
+	private Object[] convertToObjectArray(
+			Schema elementSchema,
+			TypeInformation<?> elementInfo,
+			Object object) {
 		final List<?> list = (List<?>) object;
 		final Object[] convertedArray = (Object[]) Array.newInstance(
 			elementInfo.getTypeClass(),
@@ -364,5 +382,6 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		datumReader = new SpecificDatumReader<>(schema);
 		this.inputStream = new MutableByteArrayInputStream();
 		decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
+		jodaConverter = JodaConverter.getConverter();
 	}
 }
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index 57f7629..b340cba 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -36,9 +36,6 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeFieldType;
-import org.joda.time.LocalDate;
 
 import java.io.Serializable;
 import java.lang.reflect.Array;
@@ -49,7 +46,6 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
-import static org.joda.time.DateTimeConstants.MILLIS_PER_DAY;
 
 /** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. **/
 @Internal
@@ -178,10 +174,8 @@ public class AvroToRowDataConverters {
 	}
 
 	private static AvroToRowDataConverter createMapConverter(LogicalType type) {
-		final AvroToRowDataConverter keyConverter = createConverter(
-				DataTypes.STRING().getLogicalType());
-		final AvroToRowDataConverter valueConverter = createConverter(
-				extractValueTypeToAvroMap(type));
+		final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType());
+		final AvroToRowDataConverter valueConverter = createConverter(extractValueTypeToAvroMap(type));
 
 		return avroObject -> {
 			final Map<?, ?> map = (Map<?, ?>) avroObject;
@@ -200,9 +194,13 @@ public class AvroToRowDataConverters {
 		if (object instanceof Long) {
 			millis = (Long) object;
 		} else {
-			// use 'provided' Joda time
-			final DateTime value = (DateTime) object;
-			millis = value.toDate().getTime();
+			JodaConverter jodaConverter = JodaConverter.getConverter();
+			if (jodaConverter != null) {
+				millis = jodaConverter.convertTimestamp(object);
+			} else {
+				throw new IllegalArgumentException(
+					"Unexpected object type for TIMESTAMP logical type. Received: " + object);
+			}
 		}
 		return toTimestampData(millis);
 	}
@@ -211,9 +209,12 @@ public class AvroToRowDataConverters {
 		if (object instanceof Integer) {
 			return (Integer) object;
 		} else {
-			// use 'provided' Joda time
-			final LocalDate value = (LocalDate) object;
-			return (int) (toTimestampData(value.toDate().getTime()).getMillisecond() / MILLIS_PER_DAY);
+			JodaConverter jodaConverter = JodaConverter.getConverter();
+			if (jodaConverter != null) {
+				return (int) jodaConverter.convertDate(object);
+			} else {
+				throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
+			}
 		}
 	}
 
@@ -226,9 +227,12 @@ public class AvroToRowDataConverters {
 		if (object instanceof Integer) {
 			millis = (Integer) object;
 		} else {
-			// use 'provided' Joda time
-			final org.joda.time.LocalTime value = (org.joda.time.LocalTime) object;
-			millis = value.get(DateTimeFieldType.millisOfDay());
+			JodaConverter jodaConverter = JodaConverter.getConverter();
+			if (jodaConverter != null) {
+				millis = jodaConverter.convertTime(object);
+			} else {
+				throw new IllegalArgumentException("Unexpected object type for TIME logical type. Received: " + object);
+			}
 		}
 		return millis;
 	}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
new file mode 100644
index 0000000..85cbabc
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the classpath.
+ */
+class JodaConverter {
+
+	private static JodaConverter instance;
+	private static boolean instantiated = false;
+
+	public static JodaConverter getConverter() {
+		if (instantiated) {
+			return instance;
+		}
+
+		try {
+			Class.forName("org.joda.time.DateTime", false, Thread.currentThread().getContextClassLoader());
+			instance = new JodaConverter();
+		} catch (ClassNotFoundException e) {
+			instance = null;
+		} finally {
+			instantiated = true;
+		}
+		return instance;
+	}
+
+	public long convertDate(Object object) {
+		final LocalDate value = (LocalDate) object;
+		return value.toDate().getTime();
+	}
+
+	public int convertTime(Object object) {
+		final LocalTime value = (LocalTime) object;
+		return value.get(DateTimeFieldType.millisOfDay());
+	}
+
+	public long convertTimestamp(Object object) {
+		final DateTime value = (DateTime) object;
+		return value.toDate().getTime();
+	}
+
+	private JodaConverter() {
+	}
+}