You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/11/03 17:11:32 UTC
[09/21] flink git commit: [FLINK-7420] [avro] Move all Avro code to
flink-avro
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
deleted file mode 100644
index 6f03b12..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.serialization;
-
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.Encoder;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.Utf8;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-
-/**
- * Serialization schema that serializes {@link Row} over {@link SpecificRecord} into a Avro bytes.
- */
-public class AvroRowSerializationSchema implements SerializationSchema<Row> {
-
- /**
- * Avro record class.
- */
- private Class<? extends SpecificRecord> recordClazz;
-
- /**
- * Avro serialization schema.
- */
- private transient Schema schema;
-
- /**
- * Writer to serialize Avro record into a byte array.
- */
- private transient DatumWriter<GenericRecord> datumWriter;
-
- /**
- * Output stream to serialize records into byte array.
- */
- private transient ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
-
- /**
- * Low-level class for serialization of Avro values.
- */
- private transient Encoder encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
-
- /**
- * Creates a Avro serialization schema for the given schema.
- *
- * @param recordClazz Avro record class used to deserialize Avro's record to Flink's row
- */
- public AvroRowSerializationSchema(Class<? extends SpecificRecord> recordClazz) {
- Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
- this.recordClazz = recordClazz;
- this.schema = SpecificData.get().getSchema(recordClazz);
- this.datumWriter = new SpecificDatumWriter<>(schema);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public byte[] serialize(Row row) {
- // convert to record
- final Object record = convertToRecord(schema, row);
-
- // write
- try {
- arrayOutputStream.reset();
- datumWriter.write((GenericRecord) record, encoder);
- encoder.flush();
- return arrayOutputStream.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException("Failed to serialize Row.", e);
- }
- }
-
- private void writeObject(ObjectOutputStream oos) throws IOException {
- oos.writeObject(recordClazz);
- }
-
- @SuppressWarnings("unchecked")
- private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
- this.recordClazz = (Class<? extends SpecificRecord>) ois.readObject();
- this.schema = SpecificData.get().getSchema(recordClazz);
- this.datumWriter = new SpecificDatumWriter<>(schema);
- this.arrayOutputStream = new ByteArrayOutputStream();
- this.encoder = EncoderFactory.get().binaryEncoder(arrayOutputStream, null);
- }
-
- /**
- * Converts a (nested) Flink Row into Avro's {@link GenericRecord}.
- * Strings are converted into Avro's {@link Utf8} fields.
- */
- private static Object convertToRecord(Schema schema, Object rowObj) {
- if (rowObj instanceof Row) {
- // records can be wrapped in a union
- if (schema.getType() == Schema.Type.UNION) {
- final List<Schema> types = schema.getTypes();
- if (types.size() == 2 && types.get(0).getType() == Schema.Type.NULL && types.get(1).getType() == Schema.Type.RECORD) {
- schema = types.get(1);
- }
- else if (types.size() == 2 && types.get(0).getType() == Schema.Type.RECORD && types.get(1).getType() == Schema.Type.NULL) {
- schema = types.get(0);
- }
- else {
- throw new RuntimeException("Currently we only support schemas of the following form: UNION[null, RECORD] or UNION[RECORD, NULL] Given: " + schema);
- }
- } else if (schema.getType() != Schema.Type.RECORD) {
- throw new RuntimeException("Record type for row type expected. But is: " + schema);
- }
- final List<Schema.Field> fields = schema.getFields();
- final GenericRecord record = new GenericData.Record(schema);
- final Row row = (Row) rowObj;
- for (int i = 0; i < fields.size(); i++) {
- final Schema.Field field = fields.get(i);
- record.put(field.pos(), convertToRecord(field.schema(), row.getField(i)));
- }
- return record;
- } else if (rowObj instanceof String) {
- return new Utf8((String) rowObj);
- } else {
- return rowObj;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
deleted file mode 100644
index 28f2ed3..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AvroRowDeSerializationSchemaTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
-import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema;
-import org.apache.flink.streaming.util.serialization.AvroRowSerializationSchema;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.InstantiationUtil;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Test for the Avro serialization and deserialization schema.
- */
-public class AvroRowDeSerializationSchemaTest {
-
- @Test
- public void testSerializeDeserializeSimpleRow() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
-
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- final Row actual = deserializationSchema.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-
- @Test
- public void testSerializeSimpleRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
-
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
- serializationSchema.serialize(testData.f2);
- serializationSchema.serialize(testData.f2);
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- final Row actual = deserializationSchema.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-
- @Test
- public void testDeserializeRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getSimpleTestData();
-
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- deserializationSchema.deserialize(bytes);
- deserializationSchema.deserialize(bytes);
- final Row actual = deserializationSchema.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-
- @Test
- public void testSerializeDeserializeComplexRow() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- final Row actual = deserializationSchema.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-
- @Test
- public void testSerializeComplexRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
- serializationSchema.serialize(testData.f2);
- serializationSchema.serialize(testData.f2);
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- final Row actual = deserializationSchema.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-
- @Test
- public void testDeserializeComplexRowSeveralTimes() throws IOException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
- final AvroRowSerializationSchema serializationSchema = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserializationSchema = new AvroRowDeserializationSchema(testData.f0);
-
- final byte[] bytes = serializationSchema.serialize(testData.f2);
- deserializationSchema.deserialize(bytes);
- deserializationSchema.deserialize(bytes);
- final Row actual = deserializationSchema.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-
- @Test
- public void testSerializability() throws IOException, ClassNotFoundException {
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();
-
- final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
- final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);
-
- byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
- byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);
-
- AvroRowSerializationSchema serCopy =
- InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
- AvroRowDeserializationSchema deserCopy =
- InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());
-
- final byte[] bytes = serCopy.serialize(testData.f2);
- deserCopy.deserialize(bytes);
- deserCopy.deserialize(bytes);
- final Row actual = deserCopy.deserialize(bytes);
-
- assertEquals(testData.f2, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
index def16b2..871a6f6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSourceTestBase.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
-import org.apache.flink.streaming.connectors.kafka.testutils.AvroTestUtils;
+import org.apache.flink.formats.avro.utils.AvroTestUtils;
import org.apache.flink.table.api.Types;
import org.apache.avro.Schema;
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
deleted file mode 100644
index a41125a..0000000
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/AvroTestUtils.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.io.avro.generated.Address;
-import org.apache.flink.api.io.avro.generated.Colors;
-import org.apache.flink.api.io.avro.generated.User;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.types.Row;
-
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.specific.SpecificRecord;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-
-/**
- * Utilities for creating Avro Schemas.
- */
-public final class AvroTestUtils {
-
- private static final String NAMESPACE = "org.apache.flink.streaming.connectors.kafka";
-
- /**
- * Creates a flat Avro Schema for testing.
- */
- public static Schema createFlatAvroSchema(String[] fieldNames, TypeInformation[] fieldTypes) {
- final SchemaBuilder.FieldAssembler<Schema> fieldAssembler = SchemaBuilder
- .record("BasicAvroRecord")
- .namespace(NAMESPACE)
- .fields();
-
- final Schema nullSchema = Schema.create(Schema.Type.NULL);
-
- for (int i = 0; i < fieldNames.length; i++) {
- Schema schema = ReflectData.get().getSchema(fieldTypes[i].getTypeClass());
- Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, schema));
- fieldAssembler.name(fieldNames[i]).type(unionSchema).noDefault();
- }
-
- return fieldAssembler.endRecord();
- }
-
- /**
- * Tests a simple Avro data types without nesting.
- */
- public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getSimpleTestData() {
- final Address addr = Address.newBuilder()
- .setNum(42)
- .setStreet("Main Street 42")
- .setCity("Test City")
- .setState("Test State")
- .setZip("12345")
- .build();
-
- final Row rowAddr = new Row(5);
- rowAddr.setField(0, 42);
- rowAddr.setField(1, "Main Street 42");
- rowAddr.setField(2, "Test City");
- rowAddr.setField(3, "Test State");
- rowAddr.setField(4, "12345");
-
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
- t.f0 = Address.class;
- t.f1 = addr;
- t.f2 = rowAddr;
-
- return t;
- }
-
- /**
- * Tests all Avro data types as well as nested types.
- */
- public static Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> getComplexTestData() {
- final Address addr = Address.newBuilder()
- .setNum(42)
- .setStreet("Main Street 42")
- .setCity("Test City")
- .setState("Test State")
- .setZip("12345")
- .build();
-
- final Row rowAddr = new Row(5);
- rowAddr.setField(0, 42);
- rowAddr.setField(1, "Main Street 42");
- rowAddr.setField(2, "Test City");
- rowAddr.setField(3, "Test State");
- rowAddr.setField(4, "12345");
-
- final User user = User.newBuilder()
- .setName("Charlie")
- .setFavoriteNumber(null)
- .setFavoriteColor("blue")
- .setTypeLongTest(1337L)
- .setTypeDoubleTest(1.337d)
- .setTypeNullTest(null)
- .setTypeBoolTest(false)
- .setTypeArrayString(new ArrayList<CharSequence>())
- .setTypeArrayBoolean(new ArrayList<Boolean>())
- .setTypeNullableArray(null)
- .setTypeEnum(Colors.RED)
- .setTypeMap(new HashMap<CharSequence, Long>())
- .setTypeFixed(null)
- .setTypeUnion(null)
- .setTypeNested(addr)
- .build();
-
- final Row rowUser = new Row(15);
- rowUser.setField(0, "Charlie");
- rowUser.setField(1, null);
- rowUser.setField(2, "blue");
- rowUser.setField(3, 1337L);
- rowUser.setField(4, 1.337d);
- rowUser.setField(5, null);
- rowUser.setField(6, false);
- rowUser.setField(7, new ArrayList<CharSequence>());
- rowUser.setField(8, new ArrayList<Boolean>());
- rowUser.setField(9, null);
- rowUser.setField(10, Colors.RED);
- rowUser.setField(11, new HashMap<CharSequence, Long>());
- rowUser.setField(12, null);
- rowUser.setField(13, null);
- rowUser.setField(14, rowAddr);
-
- final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> t = new Tuple3<>();
- t.f0 = User.class;
- t.f1 = user;
- t.f2 = rowUser;
-
- return t;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 97c9f20..7468b67 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -36,7 +36,6 @@ under the License.
<packaging>pom</packaging>
<modules>
- <module>flink-avro</module>
<module>flink-jdbc</module>
<module>flink-hadoop-compatibility</module>
<module>flink-hbase</module>
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-core/pom.xml b/flink-core/pom.xml
index ae3f56e..0ca742c 100644
--- a/flink-core/pom.xml
+++ b/flink-core/pom.xml
@@ -80,12 +80,6 @@ under the License.
<!-- managed version -->
</dependency>
- <!-- Avro is needed for the interoperability with Avro types for serialization -->
- <dependency>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </dependency>
-
<!-- We explicitly depend on snappy since connectors that require it load it through the system class loader -->
<dependency>
<groupId>org.xerial.snappy</groupId>
@@ -128,7 +122,7 @@ under the License.
<scope>test</scope>
</dependency>
- </dependencies>
+ </dependencies>
<profiles>
<profile>
@@ -209,6 +203,7 @@ under the License.
<exclude>org.apache.flink.core.fs.FileSystem#isFlinkSupportedScheme(java.lang.String)</exclude>
<exclude>org.apache.flink.core.fs.FileSystem#setDefaultScheme(org.apache.flink.configuration.Configuration)</exclude>
<exclude>org.apache.flink.api.java.typeutils.WritableTypeInfo</exclude>
+ <exclude>org.apache.flink.api.java.typeutils.AvroTypeInfo</exclude>
<!-- Breaking changes between 1.1 and 1.2.
We ignore these changes because these are low-level, internal runtime configuration parameters -->
<exclude>org.apache.flink.configuration.ConfigConstants#DEFAULT_JOB_MANAGER_MAX_ATTEMPTS_HISTORY_SIZE</exclude>
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index fc66ccd..88d524e 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -571,16 +571,24 @@ public class ExecutionConfig implements Serializable, Archiveable<ArchivedExecut
}
/**
- * Force Flink to use the AvroSerializer for POJOs.
+ * Forces Flink to use the Apache Avro serializer for POJOs.
+ *
+ * <b>Important:</b> Make sure to include the <i>flink-avro</i> module.
*/
public void enableForceAvro() {
forceAvro = true;
}
+ /**
+ * Disables the Apache Avro serializer as the forced serializer for POJOs.
+ */
public void disableForceAvro() {
forceAvro = false;
}
+ /**
+ * Returns whether the Apache Avro is the default serializer for POJOs.
+ */
public boolean isForceAvroEnabled() {
return forceAvro;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
deleted file mode 100644
index 1356e53..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/AvroTypeInfo.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.typeutils;
-
-import org.apache.avro.specific.SpecificRecordBase;
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Special type information to generate a special AvroTypeInfo for Avro POJOs (implementing SpecificRecordBase, the typed Avro POJOs)
- *
- * Proceeding: It uses a regular pojo type analysis and replaces all {@code GenericType<CharSequence>}
- * with a {@code GenericType<avro.Utf8>}.
- * All other types used by Avro are standard Java types.
- * Only strings are represented as CharSequence fields and represented as Utf8 classes at runtime.
- * CharSequence is not comparable. To make them nicely usable with field expressions, we replace them here
- * by generic type infos containing Utf8 classes (which are comparable),
- *
- * This class is checked by the AvroPojoTest.
- * @param <T>
- */
-@Public
-public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
- @PublicEvolving
- public AvroTypeInfo(Class<T> typeClass) {
- super(typeClass, generateFieldsFromAvroSchema(typeClass));
- }
-
- private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
- PojoTypeExtractor pte = new PojoTypeExtractor();
- ArrayList<Type> typeHierarchy = new ArrayList<>();
- typeHierarchy.add(typeClass);
- TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);
-
- if(!(ti instanceof PojoTypeInfo)) {
- throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
- }
- PojoTypeInfo pti = (PojoTypeInfo) ti;
- List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());
-
- for(int i = 0; i < pti.getArity(); i++) {
- PojoField f = pti.getPojoFieldAt(i);
- TypeInformation newType = f.getTypeInformation();
- // check if type is a CharSequence
- if(newType instanceof GenericTypeInfo) {
- if((newType).getTypeClass().equals(CharSequence.class)) {
- // replace the type by a org.apache.avro.util.Utf8
- newType = new GenericTypeInfo(org.apache.avro.util.Utf8.class);
- }
- }
- PojoField newField = new PojoField(f.getField(), newType);
- newFields.add(newField);
- }
- return newFields;
- }
-
- private static class PojoTypeExtractor extends TypeExtractor {
- private PojoTypeExtractor() {
- super();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 8a4fbbe..b24f425 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -27,12 +27,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
+import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
@@ -300,15 +301,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@Override
@PublicEvolving
+ @SuppressWarnings("unchecked")
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if(config.isForceKryoEnabled()) {
- return new KryoSerializer<T>(getTypeClass(), config);
+ return new KryoSerializer<>(getTypeClass(), config);
}
+
if(config.isForceAvroEnabled()) {
- return new AvroSerializer<T>(getTypeClass());
+ Class<?> clazz;
+ try {
+ clazz = Class.forName("org.apache.flink.formats.avro.typeutils.AvroSerializer");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not load the AvroSerializer class. " +
+ "You may be missing the 'flink-avro' dependency.");
+ }
+
+ try {
+ Constructor<?> constructor = clazz.getConstructor(Class.class);
+ return (TypeSerializer<T>) constructor.newInstance(getTypeClass());
+ } catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException("Incompatible versions of the Avro classes found.");
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("Cannot create AvroSerializer.", e.getTargetException());
+ }
}
- TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ];
+ TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
Field[] reflectiveFields = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
index 41d260d..c5c2565 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractionUtils.java
@@ -287,4 +287,39 @@ public class TypeExtractionUtils {
((TypeVariable<?>) t1).getName().equals(((TypeVariable<?>) t2).getName()) &&
((TypeVariable<?>) t1).getGenericDeclaration().equals(((TypeVariable<?>) t2).getGenericDeclaration());
}
+
+ /**
+ * Traverses the type hierarchy of a type up until a certain stop class is found.
+ *
+ * @param t type for which a hierarchy need to be created
+ * @return type of the immediate child of the stop class
+ */
+ public static Type getTypeHierarchy(List<Type> typeHierarchy, Type t, Class<?> stopAtClass) {
+ while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) {
+ typeHierarchy.add(t);
+ t = typeToClass(t).getGenericSuperclass();
+
+ if (t == null) {
+ break;
+ }
+ }
+ return t;
+ }
+
+ /**
+ * Returns true if the given class has a superclass of given name.
+ *
+ * @param clazz class to be analyzed
+ * @param superClassName class name of the super class
+ */
+ public static boolean hasSuperclass(Class<?> clazz, String superClassName) {
+ List<Type> hierarchy = new ArrayList<>();
+ getTypeHierarchy(hierarchy, clazz, Object.class);
+ for (Type t : hierarchy) {
+ if (isClassType(t) && typeToClass(t).getName().equals(superClassName)) {
+ return true;
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index c50dfc9..1a9cecb 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,7 +18,6 @@
package org.apache.flink.api.java.typeutils;
-import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
@@ -73,6 +72,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getTypeHierarchy;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
@@ -114,6 +115,10 @@ public class TypeExtractor {
private static final String HADOOP_WRITABLE_TYPEINFO_CLASS = "org.apache.flink.api.java.typeutils.WritableTypeInfo";
+ private static final String AVRO_SPECIFIC_RECORD_BASE_CLASS = "org.apache.avro.specific.SpecificRecordBase";
+
+ private static final String AVRO_TYPEINFO_CLASS = "org.apache.flink.formats.avro.typeutils.AvroTypeInfo";
+
private static final Logger LOG = LoggerFactory.getLogger(TypeExtractor.class);
public static final int[] NO_INDEX = new int[] {};
@@ -1583,24 +1588,6 @@ public class TypeExtractor {
}
/**
- * Traverses the type hierarchy of a type up until a certain stop class is found.
- *
- * @param t type for which a hierarchy need to be created
- * @return type of the immediate child of the stop class
- */
- private static Type getTypeHierarchy(ArrayList<Type> typeHierarchy, Type t, Class<?> stopAtClass) {
- while (!(isClassType(t) && typeToClass(t).equals(stopAtClass))) {
- typeHierarchy.add(t);
- t = typeToClass(t).getGenericSuperclass();
-
- if (t == null) {
- break;
- }
- }
- return t;
- }
-
- /**
* Traverses the type hierarchy up until a type information factory can be found.
*
* @param typeHierarchy hierarchy to be filled while traversing up
@@ -1806,8 +1793,8 @@ public class TypeExtractor {
}
// special case for POJOs generated by Avro.
- if(SpecificRecordBase.class.isAssignableFrom(clazz)) {
- return new AvroTypeInfo(clazz);
+ if (hasSuperclass(clazz, AVRO_SPECIFIC_RECORD_BASE_CLASS)) {
+ return createAvroTypeInfo(clazz);
}
if (Modifier.isInterface(clazz.getModifiers())) {
@@ -2119,7 +2106,7 @@ public class TypeExtractor {
private static boolean hasHadoopWritableInterface(Class<?> clazz, HashSet<Class<?>> alreadySeen) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> c : interfaces) {
- if (c.getName().equals("org.apache.hadoop.io.Writable")) {
+ if (c.getName().equals(HADOOP_WRITABLE_CLASS)) {
return true;
}
else if (alreadySeen.add(c) && hasHadoopWritableInterface(c, alreadySeen)) {
@@ -2155,7 +2142,7 @@ public class TypeExtractor {
throw new RuntimeException("Incompatible versions of the Hadoop Compatibility classes found.");
}
catch (InvocationTargetException e) {
- throw new RuntimeException("Cannot create Hadoop Writable Type info", e.getTargetException());
+ throw new RuntimeException("Cannot create Hadoop WritableTypeInfo.", e.getTargetException());
}
}
@@ -2171,7 +2158,7 @@ public class TypeExtractor {
// this is actually a writable type info
// check if the type is a writable
if (!(type instanceof Class && isHadoopWritable((Class<?>) type))) {
- throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected");
+ throw new InvalidTypesException(HADOOP_WRITABLE_CLASS + " type expected.");
}
// check writable type contents
@@ -2188,4 +2175,33 @@ public class TypeExtractor {
// ignore
}
}
+
+ // ------------------------------------------------------------------------
+ // Utilities to handle Avro's 'SpecificRecord' type via reflection
+ // ------------------------------------------------------------------------
+
+ private static <T> TypeInformation<T> createAvroTypeInfo(Class<T> clazz) {
+ Class<?> typeInfoClass;
+ try {
+ typeInfoClass = Class.forName(AVRO_TYPEINFO_CLASS, false, TypeExtractor.class.getClassLoader());
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not load the TypeInformation for the class '"
+ + AVRO_TYPEINFO_CLASS + "'. You may be missing the 'flink-avro' dependency.");
+ }
+
+ try {
+ Constructor<?> constr = typeInfoClass.getConstructor(Class.class);
+
+ @SuppressWarnings("unchecked")
+ TypeInformation<T> typeInfo = (TypeInformation<T>) constr.newInstance(clazz);
+ return typeInfo;
+ }
+ catch (NoSuchMethodException | IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException("Incompatible versions of the Avro classes found.");
+ }
+ catch (InvocationTargetException e) {
+ throw new RuntimeException("Cannot create AvroTypeInfo.", e.getTargetException());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
deleted file mode 100644
index 565bd4d..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ /dev/null
@@ -1,332 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.util.Utf8;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
-import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.esotericsoftware.kryo.Kryo;
-import org.apache.flink.util.Preconditions;
-import org.objenesis.strategy.StdInstantiatorStrategy;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * General purpose serialization. Currently using Apache Avro's Reflect-serializers for serialization and
- * Kryo for deep object copies. We want to change this to Kryo-only.
- *
- * @param <T> The type serialized.
- */
-@Internal
-public final class AvroSerializer<T> extends TypeSerializer<T> {
-
- private static final long serialVersionUID = 1L;
-
- private final Class<T> type;
-
- private final Class<? extends T> typeToInstantiate;
-
- /**
- * Map of class tag (using classname as tag) to their Kryo registration.
- *
- * <p>This map serves as a preview of the final registration result of
- * the Kryo instance, taking into account registration overwrites.
- */
- private LinkedHashMap<String, KryoRegistration> kryoRegistrations;
-
- private transient ReflectDatumWriter<T> writer;
- private transient ReflectDatumReader<T> reader;
-
- private transient DataOutputEncoder encoder;
- private transient DataInputDecoder decoder;
-
- private transient Kryo kryo;
-
- private transient T deepCopyInstance;
-
- // --------------------------------------------------------------------------------------------
-
- public AvroSerializer(Class<T> type) {
- this(type, type);
- }
-
- public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
- this.type = checkNotNull(type);
- this.typeToInstantiate = checkNotNull(typeToInstantiate);
-
- InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
- this.kryoRegistrations = buildKryoRegistrations(type);
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public boolean isImmutableType() {
- return false;
- }
-
- @Override
- public AvroSerializer<T> duplicate() {
- return new AvroSerializer<T>(type, typeToInstantiate);
- }
-
- @Override
- public T createInstance() {
- return InstantiationUtil.instantiate(this.typeToInstantiate);
- }
-
- @Override
- public T copy(T from) {
- checkKryoInitialized();
-
- return KryoUtils.copy(from, kryo, this);
- }
-
- @Override
- public T copy(T from, T reuse) {
- checkKryoInitialized();
-
- return KryoUtils.copy(from, reuse, kryo, this);
- }
-
- @Override
- public int getLength() {
- return -1;
- }
-
- @Override
- public void serialize(T value, DataOutputView target) throws IOException {
- checkAvroInitialized();
- this.encoder.setOut(target);
- this.writer.write(value, this.encoder);
- }
-
- @Override
- public T deserialize(DataInputView source) throws IOException {
- checkAvroInitialized();
- this.decoder.setIn(source);
- return this.reader.read(null, this.decoder);
- }
-
- @Override
- public T deserialize(T reuse, DataInputView source) throws IOException {
- checkAvroInitialized();
- this.decoder.setIn(source);
- return this.reader.read(reuse, this.decoder);
- }
-
- @Override
- public void copy(DataInputView source, DataOutputView target) throws IOException {
- checkAvroInitialized();
-
- if (this.deepCopyInstance == null) {
- this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
- }
-
- this.decoder.setIn(source);
- this.encoder.setOut(target);
-
- T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
- this.writer.write(tmp, this.encoder);
- }
-
-
- private void checkAvroInitialized() {
- if (this.reader == null) {
- this.reader = new ReflectDatumReader<T>(type);
- this.writer = new ReflectDatumWriter<T>(type);
- this.encoder = new DataOutputEncoder();
- this.decoder = new DataInputDecoder();
- }
- }
-
- private void checkKryoInitialized() {
- if (this.kryo == null) {
- this.kryo = new Kryo();
-
- Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
- instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
- kryo.setInstantiatorStrategy(instantiatorStrategy);
-
- kryo.setAsmEnabled(true);
-
- KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int hashCode() {
- return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof AvroSerializer) {
- @SuppressWarnings("unchecked")
- AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
-
- return avroSerializer.canEqual(this) &&
- type == avroSerializer.type &&
- typeToInstantiate == avroSerializer.typeToInstantiate;
- } else {
- return false;
- }
- }
-
- @Override
- public boolean canEqual(Object obj) {
- return obj instanceof AvroSerializer;
- }
-
- // --------------------------------------------------------------------------------------------
- // Serializer configuration snapshotting & compatibility
- // --------------------------------------------------------------------------------------------
-
- @Override
- public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
- return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
- final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;
-
- if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
- // resolve Kryo registrations; currently, since the Kryo registrations in Avro
- // are fixed, there shouldn't be a problem with the resolution here.
-
- LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
- oldRegistrations.putAll(kryoRegistrations);
-
- for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
- if (reconfiguredRegistrationEntry.getValue().isDummy()) {
- return CompatibilityResult.requiresMigration();
- }
- }
-
- this.kryoRegistrations = oldRegistrations;
- return CompatibilityResult.compatible();
- }
- }
-
- // ends up here if the preceding serializer is not
- // the ValueSerializer, or serialized data type has changed
- return CompatibilityResult.requiresMigration();
- }
-
- public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
-
- private static final int VERSION = 1;
-
- private Class<? extends T> typeToInstantiate;
-
- public AvroSerializerConfigSnapshot() {}
-
- public AvroSerializerConfigSnapshot(
- Class<T> baseType,
- Class<? extends T> typeToInstantiate,
- LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
-
- super(baseType, kryoRegistrations);
- this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
- }
-
- @Override
- public void write(DataOutputView out) throws IOException {
- super.write(out);
-
- out.writeUTF(typeToInstantiate.getName());
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public void read(DataInputView in) throws IOException {
- super.read(in);
-
- String classname = in.readUTF();
- try {
- typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
- } catch (ClassNotFoundException e) {
- throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
- }
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- public Class<? extends T> getTypeToInstantiate() {
- return typeToInstantiate;
- }
- }
-
- // --------------------------------------------------------------------------------------------
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
-
- // kryoRegistrations may be null if this Avro serializer is deserialized from an old version
- if (kryoRegistrations == null) {
- this.kryoRegistrations = buildKryoRegistrations(type);
- }
- }
-
- private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
- final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();
-
- // register Avro types.
- registrations.put(
- GenericData.Array.class.getName(),
- new KryoRegistration(
- GenericData.Array.class,
- new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
- registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
- registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
- registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
- registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));
-
- // register the serialized data type
- registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));
-
- return registrations;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
deleted file mode 100644
index c0454c6..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataInputDecoder.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Decoder;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.annotation.Internal;
-
-@Internal
-public class DataInputDecoder extends Decoder implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private transient Utf8 stringDecoder = new Utf8();
-
-
- private transient DataInput in;
-
-
- public void setIn(DataInput in) {
- this.in = in;
- }
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readNull() {}
-
-
- @Override
- public boolean readBoolean() throws IOException {
- return in.readBoolean();
- }
-
- @Override
- public int readInt() throws IOException {
- return in.readInt();
- }
-
- @Override
- public long readLong() throws IOException {
- return in.readLong();
- }
-
- @Override
- public float readFloat() throws IOException {
- return in.readFloat();
- }
-
- @Override
- public double readDouble() throws IOException {
- return in.readDouble();
- }
-
- @Override
- public int readEnum() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void readFixed(byte[] bytes, int start, int length) throws IOException {
- in.readFully(bytes, start, length);
- }
-
- @Override
- public ByteBuffer readBytes(ByteBuffer old) throws IOException {
- int length = readInt();
- ByteBuffer result;
- if (old != null && length <= old.capacity() && old.hasArray()) {
- result = old;
- result.clear();
- } else {
- result = ByteBuffer.allocate(length);
- }
- in.readFully(result.array(), result.arrayOffset() + result.position(), length);
- result.limit(length);
- return result;
- }
-
-
- @Override
- public void skipFixed(int length) throws IOException {
- skipBytes(length);
- }
-
- @Override
- public void skipBytes() throws IOException {
- int num = readInt();
- skipBytes(num);
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
-
- @Override
- public Utf8 readString(Utf8 old) throws IOException {
- int length = readInt();
- Utf8 result = (old != null ? old : new Utf8());
- result.setByteLength(length);
-
- if (length > 0) {
- in.readFully(result.getBytes(), 0, length);
- }
-
- return result;
- }
-
- @Override
- public String readString() throws IOException {
- return readString(stringDecoder).toString();
- }
-
- @Override
- public void skipString() throws IOException {
- int len = readInt();
- skipBytes(len);
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public long readArrayStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long arrayNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipArray() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long readMapStart() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long mapNext() throws IOException {
- return readVarLongCount(in);
- }
-
- @Override
- public long skipMap() throws IOException {
- return readVarLongCount(in);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public int readIndex() throws IOException {
- return readInt();
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
- private void skipBytes(int num) throws IOException {
- while (num > 0) {
- num -= in.skipBytes(num);
- }
- }
-
- public static long readVarLongCount(DataInput in) throws IOException {
- long value = in.readUnsignedByte();
-
- if ((value & 0x80) == 0) {
- return value;
- }
- else {
- long curr;
- int shift = 7;
- value = value & 0x7f;
- while (((curr = in.readUnsignedByte()) & 0x80) != 0){
- value |= (curr & 0x7f) << shift;
- shift += 7;
- }
- value |= curr << shift;
- return value;
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // serialization
- // --------------------------------------------------------------------------------------------
-
- private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
- // Read in size, and any hidden stuff
- s.defaultReadObject();
-
- this.stringDecoder = new Utf8();
- this.in = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
deleted file mode 100644
index c41b648..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/DataOutputEncoder.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import java.io.DataOutput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.avro.io.Encoder;
-import org.apache.avro.util.Utf8;
-import org.apache.flink.annotation.Internal;
-
-@Internal
-public final class DataOutputEncoder extends Encoder implements java.io.Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private transient DataOutput out;
-
-
- public void setOut(DataOutput out) {
- this.out = out;
- }
-
-
- @Override
- public void flush() throws IOException {}
-
- // --------------------------------------------------------------------------------------------
- // primitives
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeNull() {}
-
-
- @Override
- public void writeBoolean(boolean b) throws IOException {
- out.writeBoolean(b);
- }
-
- @Override
- public void writeInt(int n) throws IOException {
- out.writeInt(n);
- }
-
- @Override
- public void writeLong(long n) throws IOException {
- out.writeLong(n);
- }
-
- @Override
- public void writeFloat(float f) throws IOException {
- out.writeFloat(f);
- }
-
- @Override
- public void writeDouble(double d) throws IOException {
- out.writeDouble(d);
- }
-
- @Override
- public void writeEnum(int e) throws IOException {
- out.writeInt(e);
- }
-
-
- // --------------------------------------------------------------------------------------------
- // bytes
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeFixed(byte[] bytes, int start, int len) throws IOException {
- out.write(bytes, start, len);
- }
-
- @Override
- public void writeBytes(byte[] bytes, int start, int len) throws IOException {
- out.writeInt(len);
- if (len > 0) {
- out.write(bytes, start, len);
- }
- }
-
- @Override
- public void writeBytes(ByteBuffer bytes) throws IOException {
- int num = bytes.remaining();
- out.writeInt(num);
-
- if (num > 0) {
- writeFixed(bytes);
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // strings
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeString(String str) throws IOException {
- byte[] bytes = Utf8.getBytesFor(str);
- writeBytes(bytes, 0, bytes.length);
- }
-
- @Override
- public void writeString(Utf8 utf8) throws IOException {
- writeBytes(utf8.getBytes(), 0, utf8.getByteLength());
-
- }
-
- // --------------------------------------------------------------------------------------------
- // collection types
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeArrayStart() {}
-
- @Override
- public void setItemCount(long itemCount) throws IOException {
- if (itemCount > 0) {
- writeVarLongCount(out, itemCount);
- }
- }
-
- @Override
- public void startItem() {}
-
- @Override
- public void writeArrayEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- @Override
- public void writeMapStart() {}
-
- @Override
- public void writeMapEnd() throws IOException {
- // write a single byte 0, shortcut for a var-length long of 0
- out.write(0);
- }
-
- // --------------------------------------------------------------------------------------------
- // union
- // --------------------------------------------------------------------------------------------
-
- @Override
- public void writeIndex(int unionIndex) throws IOException {
- out.writeInt(unionIndex);
- }
-
- // --------------------------------------------------------------------------------------------
- // utils
- // --------------------------------------------------------------------------------------------
-
-
- public static final void writeVarLongCount(DataOutput out, long val) throws IOException {
- if (val < 0) {
- throw new IOException("Illegal count (must be non-negative): " + val);
- }
-
- while ((val & ~0x7FL) != 0) {
- out.write(((int) val) | 0x80);
- val >>>= 7;
- }
- out.write((int) val);
- }
-
- private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
- // Read in size, and any hidden stuff
- s.defaultReadObject();
-
- this.out = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 6730136..269cf35 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -24,8 +24,6 @@ import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.generic.GenericData;
-
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -406,7 +404,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
// there's actually no way to tell if new Kryo serializers are compatible with
- // the previous ones they overwrite; we can only signal compatibly and hope for the best
+ // the previous ones they overwrite; we can only signal compatibility and hope for the best
this.kryoRegistrations = reconfiguredRegistrations;
return CompatibilityResult.compatible();
}
@@ -478,11 +476,8 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
registeredTypeWithSerializerEntry.getValue()));
}
- kryoRegistrations.put(
- GenericData.Array.class.getName(),
- new KryoRegistration(
- GenericData.Array.class,
- new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+ // add Avro support if flink-avro is available; a dummy otherwise
+ Serializers.addAvroGenericDataArrayRegistration(kryoRegistrations);
return kryoRegistrations;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
index 4976d6a..de7b2fc 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java
@@ -18,16 +18,6 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.specific.SpecificRecordBase;
-
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -35,18 +25,29 @@ import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
+import org.apache.flink.api.java.typeutils.runtime.KryoRegistration;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.hasSuperclass;
+
/**
* Class containing utilities for the serializers of the Flink Runtime.
@@ -60,6 +61,14 @@ import java.util.Set;
@Internal
public class Serializers {
+ private static final String AVRO_SPECIFIC_RECORD_BASE = "org.apache.avro.specific.SpecificRecordBase";
+
+ private static final String AVRO_GENERIC_RECORD = "org.apache.avro.generic.GenericData$Record";
+
+ private static final String AVRO_KRYO_UTILS = "org.apache.flink.formats.avro.utils.AvroKryoSerializerUtils";
+
+ private static final String AVRO_GENERIC_DATA_ARRAY = "org.apache.avro.generic.GenericData$Array";
+
public static void recursivelyRegisterType(TypeInformation<?> typeInfo, ExecutionConfig config, Set<Class<?>> alreadySeen) {
if (typeInfo instanceof GenericTypeInfo) {
GenericTypeInfo<?> genericTypeInfo = (GenericTypeInfo<?>) typeInfo;
@@ -94,8 +103,11 @@ public class Serializers {
}
else {
config.registerKryoType(type);
- checkAndAddSerializerForTypeAvro(config, type);
-
+ // add serializers for Avro type if necessary
+ if (hasSuperclass(type, AVRO_SPECIFIC_RECORD_BASE) || hasSuperclass(type, AVRO_GENERIC_RECORD)) {
+ addAvroSerializers(config, type);
+ }
+
Field[] fields = type.getDeclaredFields();
for (Field field : fields) {
if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
@@ -147,20 +159,54 @@ public class Serializers {
}
}
}
-
- // ------------------------------------------------------------------------
-
- private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) {
- if (GenericData.Record.class.isAssignableFrom(type) || SpecificRecordBase.class.isAssignableFrom(type)) {
- // Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
- // because Kryo is not able to serialize them properly, we use this serializer for them
- reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);
-
- // We register this serializer for users who want to use untyped Avro records (GenericData.Record).
- // Kryo is able to serialize everything in there, except for the Schema.
- // This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
- // we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
- reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
+
+ /**
+ * Loads the utility class from <code>flink-avro</code> and adds Avro-specific serializers.
+ */
+ private static void addAvroSerializers(ExecutionConfig reg, Class<?> type) {
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(AVRO_KRYO_UTILS, false, Serializers.class.getClassLoader());
+ }
+ catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not load class '" + AVRO_KRYO_UTILS + "'. " +
+ "You may be missing the 'flink-avro' dependency.");
+ }
+ try {
+ clazz.getDeclaredMethod("addAvroSerializers", ExecutionConfig.class, Class.class).invoke(null, reg, type);
+ } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException("Could not access method in 'flink-avro' dependency.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static void addAvroGenericDataArrayRegistration(LinkedHashMap<String, KryoRegistration> kryoRegistrations) {
+ try {
+ Class<?> clazz = Class.forName(AVRO_GENERIC_DATA_ARRAY, false, Serializers.class.getClassLoader());
+
+ kryoRegistrations.put(
+ AVRO_GENERIC_DATA_ARRAY,
+ new KryoRegistration(
+ clazz,
+ new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
+ }
+ catch (ClassNotFoundException e) {
+ kryoRegistrations.put(AVRO_GENERIC_DATA_ARRAY,
+ new KryoRegistration(DummyAvroRegisteredClass.class, (Class) DummyAvroKryoSerializerClass.class));
+ }
+ }
+
+ public static class DummyAvroRegisteredClass {}
+
+ public static class DummyAvroKryoSerializerClass<T> extends Serializer<T> {
+ @Override
+ public void write(Kryo kryo, Output output, Object o) {
+ throw new UnsupportedOperationException("Could not find required Avro dependency.");
+ }
+
+ @Override
+ public T read(Kryo kryo, Input input, Class<T> aClass) {
+ throw new UnsupportedOperationException("Could not find required Avro dependency.");
}
}
@@ -168,6 +214,9 @@ public class Serializers {
// Custom Serializers
// --------------------------------------------------------------------------------------------
+ /**
+ * Special serializer for Java's {@link ArrayList} used for Avro's GenericData.Array.
+ */
@SuppressWarnings("rawtypes")
public static class SpecificInstanceCollectionSerializerForArrayList extends SpecificInstanceCollectionSerializer<ArrayList> {
private static final long serialVersionUID = 1L;
@@ -176,19 +225,19 @@ public class Serializers {
super(ArrayList.class);
}
}
+
/**
* Special serializer for Java collections enforcing certain instance types.
* Avro is serializing collections with an "GenericData.Array" type. Kryo is not able to handle
* this type, so we use ArrayLists.
*/
@SuppressWarnings("rawtypes")
- public static class SpecificInstanceCollectionSerializer<T extends Collection>
- extends CollectionSerializer implements Serializable
- {
+ public static class SpecificInstanceCollectionSerializer<T extends Collection>
+ extends CollectionSerializer implements Serializable {
private static final long serialVersionUID = 1L;
-
+
private Class<T> type;
-
+
public SpecificInstanceCollectionSerializer(Class<T> type) {
this.type = type;
}
@@ -203,27 +252,4 @@ public class Serializers {
return kryo.newInstance(this.type);
}
}
-
- /**
- * Slow serialization approach for Avro schemas.
- * This is only used with {{@link org.apache.avro.generic.GenericData.Record}} types.
- * Having this serializer, we are able to handle avro Records.
- */
- public static class AvroSchemaSerializer extends Serializer<Schema> implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void write(Kryo kryo, Output output, Schema object) {
- String schemaAsString = object.toString(false);
- output.writeString(schemaAsString);
- }
-
- @Override
- public Schema read(Kryo kryo, Input input, Class<Schema> type) {
- String schemaAsString = input.readString();
- // the parser seems to be stateful, to we need a new one for every type.
- Schema.Parser sParser = new Schema.Parser();
- return sParser.parse(schemaAsString);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
deleted file mode 100644
index 5b08e52..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericArraySerializerTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericArraySerializerTest extends AbstractGenericArraySerializerTest {
- @Override
- protected <T> TypeSerializer<T> createComponentSerializer(Class<T> type) {
- return new AvroSerializer<T>(type);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
deleted file mode 100644
index 19fac43..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeComparatorTest.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeComparatorTest extends AbstractGenericTypeComparatorTest {
- @Override
- protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
- return new AvroSerializer<T>(type);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
deleted file mode 100644
index df1ff60..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroGenericTypeSerializerTest.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-
-public class AvroGenericTypeSerializerTest extends AbstractGenericTypeSerializerTest {
-
- @Override
- protected <T> TypeSerializer<T> createSerializer(Class<T> type) {
- return new AvroSerializer<T>(type);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
deleted file mode 100644
index 8a89410..0000000
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializerEmptyArrayTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.typeutils.runtime;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.avro.reflect.Nullable;
-import org.apache.flink.api.common.typeutils.SerializerTestInstance;
-import org.junit.Test;
-
-public class AvroSerializerEmptyArrayTest {
-
- @Test
- public void testBookSerialization() {
- try {
- Book b = new Book(123, "This is a test book", 26382648);
- AvroSerializer<Book> serializer = new AvroSerializer<Book>(Book.class);
- SerializerTestInstance<Book> test = new SerializerTestInstance<Book>(serializer, Book.class, -1, b);
- test.testAll();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerialization() {
- try {
- List<String> titles = new ArrayList<String>();
-
- List<Book> books = new ArrayList<Book>();
- books.add(new Book(123, "This is a test book", 1));
- books.add(new Book(24234234, "This is a test book", 1));
- books.add(new Book(1234324, "This is a test book", 3));
-
- BookAuthor a = new BookAuthor(1, titles, "Test Author");
- a.books = books;
- a.bookType = BookAuthor.BookType.journal;
-
- AvroSerializer<BookAuthor> serializer = new AvroSerializer<BookAuthor>(BookAuthor.class);
-
- SerializerTestInstance<BookAuthor> test = new SerializerTestInstance<BookAuthor>(serializer, BookAuthor.class, -1, a);
- test.testAll();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- public static class Book {
-
- long bookId;
- @Nullable
- String title;
- long authorId;
-
- public Book() {}
-
- public Book(long bookId, String title, long authorId) {
- this.bookId = bookId;
- this.title = title;
- this.authorId = authorId;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (authorId ^ (authorId >>> 32));
- result = prime * result + (int) (bookId ^ (bookId >>> 32));
- result = prime * result + ((title == null) ? 0 : title.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Book other = (Book) obj;
- if (authorId != other.authorId)
- return false;
- if (bookId != other.bookId)
- return false;
- if (title == null) {
- if (other.title != null)
- return false;
- } else if (!title.equals(other.title))
- return false;
- return true;
- }
- }
-
- public static class BookAuthor {
-
- enum BookType {
- book,
- article,
- journal
- }
-
- long authorId;
-
- @Nullable
- List<String> bookTitles;
-
- @Nullable
- List<Book> books;
-
- String authorName;
-
- BookType bookType;
-
- public BookAuthor() {}
-
- public BookAuthor(long authorId, List<String> bookTitles, String authorName) {
- this.authorId = authorId;
- this.bookTitles = bookTitles;
- this.authorName = authorName;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (authorId ^ (authorId >>> 32));
- result = prime * result + ((authorName == null) ? 0 : authorName.hashCode());
- result = prime * result + ((bookTitles == null) ? 0 : bookTitles.hashCode());
- result = prime * result + ((bookType == null) ? 0 : bookType.hashCode());
- result = prime * result + ((books == null) ? 0 : books.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- BookAuthor other = (BookAuthor) obj;
- if (authorId != other.authorId)
- return false;
- if (authorName == null) {
- if (other.authorName != null)
- return false;
- } else if (!authorName.equals(other.authorName))
- return false;
- if (bookTitles == null) {
- if (other.bookTitles != null)
- return false;
- } else if (!bookTitles.equals(other.bookTitles))
- return false;
- if (bookType != other.bookType)
- return false;
- if (books == null) {
- if (other.books != null)
- return false;
- } else if (!books.equals(other.books))
- return false;
- return true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
index 5a404bd..1cacc9e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializerCompatibilityTest.java
@@ -18,20 +18,22 @@
package org.apache.flink.api.java.typeutils.runtime.kryo;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -42,6 +44,20 @@ import static org.junit.Assert.assertTrue;
*/
public class KryoSerializerCompatibilityTest {
+ @Test
+ public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
+ KryoSerializer<TestClass> kryoSerializerForA = new KryoSerializer<>(TestClass.class, new ExecutionConfig());
+
+ // read configuration again from bytes
+ TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot;
+ try (InputStream in = getClass().getResourceAsStream("/kryo-serializer-flink1.3-snapshot")) {
+ kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
+ new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
+ }
+ CompatibilityResult<TestClass> compatResult = kryoSerializerForA.ensureCompatibility(kryoSerializerConfigSnapshot);
+ assertFalse(compatResult.isRequiresMigration());
+ }
+
/**
* Verifies that reconfiguration result is INCOMPATIBLE if data type has changed.
*/
@@ -60,7 +76,7 @@ public class KryoSerializerCompatibilityTest {
KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig());
// read configuration again from bytes
- try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
@@ -103,7 +119,7 @@ public class KryoSerializerCompatibilityTest {
kryoSerializer = new KryoSerializer<>(TestClass.class, executionConfig);
// read configuration from bytes
- try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
+ try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/537a10ea/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot
new file mode 100644
index 0000000..0123a9c
Binary files /dev/null and b/flink-core/src/test/resources/kryo-serializer-flink1.3-snapshot differ