You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/09/14 19:48:26 UTC
[flink] 02/07: [hotfix] Extract joda conversions to a separate class
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 09616cba088da907f6d279f959ea8d42e0434e9b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Sep 10 10:23:29 2020 +0200
[hotfix] Extract joda conversions to a separate class
---
.../formats/avro/AvroRowDeserializationSchema.java | 53 +++++++++++------
.../formats/avro/AvroToRowDataConverters.java | 38 ++++++------
.../apache/flink/formats/avro/JodaConverter.java | 67 ++++++++++++++++++++++
3 files changed, 124 insertions(+), 34 deletions(-)
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 5e3edf7..fb7a74e 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -42,10 +42,8 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeFieldType;
-import org.joda.time.LocalDate;
-import org.joda.time.LocalTime;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -124,6 +122,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
private transient Decoder decoder;
/**
+ * Converter for joda classes.
+ */
+ private transient @Nullable JodaConverter jodaConverter;
+
+ /**
* Creates a Avro deserialization schema for the given specific record class. Having the
* concrete Avro record class might improve performance.
*
@@ -139,6 +142,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
datumReader = new SpecificDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ jodaConverter = JodaConverter.getConverter();
}
/**
@@ -158,6 +162,7 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
datumReader = new GenericDatumReader<>(schema);
inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ jodaConverter = JodaConverter.getConverter();
}
@Override
@@ -196,7 +201,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
// --------------------------------------------------------------------------------------------
- private Row convertAvroRecordToRow(Schema schema, RowTypeInfo typeInfo, IndexedRecord record) {
+ private Row convertAvroRecordToRow(
+ Schema schema,
+ RowTypeInfo typeInfo,
+ IndexedRecord record) {
final List<Schema.Field> fields = schema.getFields();
final TypeInformation<?>[] fieldInfo = typeInfo.getFieldTypes();
final int length = fields.size();
@@ -208,7 +216,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
return row;
}
- private Object convertAvroType(Schema schema, TypeInformation<?> info, Object object) {
+ private Object convertAvroType(
+ Schema schema,
+ TypeInformation<?> info,
+ Object object) {
// we perform the conversion based on schema information but enriched with pre-computed
// type information where useful (i.e., for arrays)
@@ -239,7 +250,11 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
for (Map.Entry<?, ?> entry : map.entrySet()) {
convertedMap.put(
entry.getKey().toString(),
- convertAvroType(schema.getValueType(), mapTypeInfo.getValueTypeInfo(), entry.getValue()));
+ convertAvroType(
+ schema.getValueType(),
+ mapTypeInfo.getValueTypeInfo(),
+ entry.getValue())
+ );
}
return convertedMap;
case UNION:
@@ -302,10 +317,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
// adopted from Apache Calcite
final long t = (long) value * 86400000L;
millis = t - (long) LOCAL_TZ.getOffset(t);
+ } else if (jodaConverter != null) {
+ millis = jodaConverter.convertDate(object);
} else {
- // use 'provided' Joda time
- final LocalDate value = (LocalDate) object;
- millis = value.toDate().getTime();
+ throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
}
return new Date(millis);
}
@@ -314,10 +329,10 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
final long millis;
if (object instanceof Integer) {
millis = (Integer) object;
+ } else if (jodaConverter != null) {
+ millis = jodaConverter.convertTime(object);
} else {
- // use 'provided' Joda time
- final LocalTime value = (LocalTime) object;
- millis = (long) value.get(DateTimeFieldType.millisOfDay());
+ throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
}
return new Time(millis - LOCAL_TZ.getOffset(millis));
}
@@ -326,15 +341,18 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
final long millis;
if (object instanceof Long) {
millis = (Long) object;
+ } else if (jodaConverter != null) {
+ millis = jodaConverter.convertTimestamp(object);
} else {
- // use 'provided' Joda time
- final DateTime value = (DateTime) object;
- millis = value.toDate().getTime();
+ throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
}
return new Timestamp(millis - LOCAL_TZ.getOffset(millis));
}
- private Object[] convertToObjectArray(Schema elementSchema, TypeInformation<?> elementInfo, Object object) {
+ private Object[] convertToObjectArray(
+ Schema elementSchema,
+ TypeInformation<?> elementInfo,
+ Object object) {
final List<?> list = (List<?>) object;
final Object[] convertedArray = (Object[]) Array.newInstance(
elementInfo.getTypeClass(),
@@ -364,5 +382,6 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
datumReader = new SpecificDatumReader<>(schema);
this.inputStream = new MutableByteArrayInputStream();
decoder = DecoderFactory.get().binaryDecoder(this.inputStream, null);
+ jodaConverter = JodaConverter.getConverter();
}
}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
index 57f7629..b340cba 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroToRowDataConverters.java
@@ -36,9 +36,6 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeFieldType;
-import org.joda.time.LocalDate;
import java.io.Serializable;
import java.lang.reflect.Array;
@@ -49,7 +46,6 @@ import java.util.List;
import java.util.Map;
import static org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.extractValueTypeToAvroMap;
-import static org.joda.time.DateTimeConstants.MILLIS_PER_DAY;
/** Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. **/
@Internal
@@ -178,10 +174,8 @@ public class AvroToRowDataConverters {
}
private static AvroToRowDataConverter createMapConverter(LogicalType type) {
- final AvroToRowDataConverter keyConverter = createConverter(
- DataTypes.STRING().getLogicalType());
- final AvroToRowDataConverter valueConverter = createConverter(
- extractValueTypeToAvroMap(type));
+ final AvroToRowDataConverter keyConverter = createConverter(DataTypes.STRING().getLogicalType());
+ final AvroToRowDataConverter valueConverter = createConverter(extractValueTypeToAvroMap(type));
return avroObject -> {
final Map<?, ?> map = (Map<?, ?>) avroObject;
@@ -200,9 +194,13 @@ public class AvroToRowDataConverters {
if (object instanceof Long) {
millis = (Long) object;
} else {
- // use 'provided' Joda time
- final DateTime value = (DateTime) object;
- millis = value.toDate().getTime();
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTimestamp(object);
+ } else {
+ throw new IllegalArgumentException(
+ "Unexpected object type for TIMESTAMP logical type. Received: " + object);
+ }
}
return toTimestampData(millis);
}
@@ -211,9 +209,12 @@ public class AvroToRowDataConverters {
if (object instanceof Integer) {
return (Integer) object;
} else {
- // use 'provided' Joda time
- final LocalDate value = (LocalDate) object;
- return (int) (toTimestampData(value.toDate().getTime()).getMillisecond() / MILLIS_PER_DAY);
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ return (int) jodaConverter.convertDate(object);
+ } else {
+ throw new IllegalArgumentException("Unexpected object type for DATE logical type. Received: " + object);
+ }
}
}
@@ -226,9 +227,12 @@ public class AvroToRowDataConverters {
if (object instanceof Integer) {
millis = (Integer) object;
} else {
- // use 'provided' Joda time
- final org.joda.time.LocalTime value = (org.joda.time.LocalTime) object;
- millis = value.get(DateTimeFieldType.millisOfDay());
+ JodaConverter jodaConverter = JodaConverter.getConverter();
+ if (jodaConverter != null) {
+ millis = jodaConverter.convertTime(object);
+ } else {
+ throw new IllegalArgumentException("Unexpected object type for TIME logical type. Received: " + object);
+ }
}
return millis;
}
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
new file mode 100644
index 0000000..85cbabc
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/JodaConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeFieldType;
+import org.joda.time.LocalDate;
+import org.joda.time.LocalTime;
+
+/**
+ * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the classpath.
+ */
+class JodaConverter {
+
+ private static JodaConverter instance;
+ private static boolean instantiated = false;
+
+ public static JodaConverter getConverter() {
+ if (instantiated) {
+ return instance;
+ }
+
+ try {
+ Class.forName("org.joda.time.DateTime", false, Thread.currentThread().getContextClassLoader());
+ instance = new JodaConverter();
+ } catch (ClassNotFoundException e) {
+ instance = null;
+ } finally {
+ instantiated = true;
+ }
+ return instance;
+ }
+
+ public long convertDate(Object object) {
+ final LocalDate value = (LocalDate) object;
+ return value.toDate().getTime();
+ }
+
+ public int convertTime(Object object) {
+ final LocalTime value = (LocalTime) object;
+ return value.get(DateTimeFieldType.millisOfDay());
+ }
+
+ public long convertTimestamp(Object object) {
+ final DateTime value = (DateTime) object;
+ return value.toDate().getTime();
+ }
+
+ private JodaConverter() {
+ }
+}