You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/15 10:14:50 UTC

[flink] branch release-1.11 updated: [FLINK-18223] Fixed AvroSerializer to initialize GenericRecords in the correct way

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new b183422  [FLINK-18223] Fixed AvroSerializer to initialize GenericRecords in the correct way
b183422 is described below

commit b1834221961162c37b14e3185c1249535fbd32ec
Author: Lorenzo Nicora <lo...@contino.io>
AuthorDate: Wed Jun 10 09:56:43 2020 +0100

    [FLINK-18223] Fixed AvroSerializer to initialize GenericRecords in the correct way
    
    This closes #12591
---
 .../formats/avro/typeutils/AvroSerializer.java     |  5 +-
 .../typeutils/AvroSerializerGenericRecordTest.java | 59 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 643fb6b..c4253f4 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.formats.avro.utils.DataInputDecoder;
 import org.apache.flink.formats.avro.utils.DataOutputEncoder;
-import org.apache.flink.util.InstantiationUtil;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
@@ -169,8 +168,10 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 	// ------------------------------------------------------------------------
 
 	@Override
+	@SuppressWarnings("unchecked")
 	public T createInstance() {
-		return InstantiationUtil.instantiate(type);
+		checkAvroInitialized();
+		return (T) avroData.newRecord(null, runtimeSchema);
 	}
 
 	@Override
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java
new file mode 100644
index 0000000..5260c27
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+
+/**
+ * Test for {@link AvroSerializer} that tests GenericRecord.
+ */
+public class AvroSerializerGenericRecordTest extends SerializerTestBase<GenericRecord> {
+
+	private static final Schema SCHEMA = new org.apache.avro.Schema.Parser()
+		.parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": "
+			+ "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
+
+	@Override
+	protected TypeSerializer<GenericRecord> createSerializer() {
+		return new AvroSerializer<>(GenericRecord.class, SCHEMA);
+	}
+
+	@Override
+	protected int getLength() {
+		return -1;
+	}
+
+	@Override
+	protected Class<GenericRecord> getTypeClass() {
+		return GenericRecord.class;
+	}
+
+	@Override
+	protected GenericRecord[] getTestData() {
+		return new GenericRecord[]{
+			new GenericRecordBuilder(SCHEMA)
+				.set("afield", "foo bar")
+				.build()};
+	}
+}