You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2020/06/15 10:14:50 UTC
[flink] branch release-1.11 updated: [FLINK-18223] Fixed
AvroSerializer to initialize GenericRecords in the correct way
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new b183422 [FLINK-18223] Fixed AvroSerializer to initialize GenericRecords in the correct way
b183422 is described below
commit b1834221961162c37b14e3185c1249535fbd32ec
Author: Lorenzo Nicora <lo...@contino.io>
AuthorDate: Wed Jun 10 09:56:43 2020 +0100
[FLINK-18223] Fixed AvroSerializer to initialize GenericRecords in the correct way
This closes #12591
---
.../formats/avro/typeutils/AvroSerializer.java | 5 +-
.../typeutils/AvroSerializerGenericRecordTest.java | 59 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 643fb6b..c4253f4 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -25,7 +25,6 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
-import org.apache.flink.util.InstantiationUtil;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -169,8 +168,10 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
// ------------------------------------------------------------------------
@Override
+ @SuppressWarnings("unchecked")
public T createInstance() {
- return InstantiationUtil.instantiate(type);
+ checkAvroInitialized();
+ return (T) avroData.newRecord(null, runtimeSchema);
}
@Override
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java
new file mode 100644
index 0000000..5260c27
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSerializerGenericRecordTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+
+/**
+ * Test for {@link AvroSerializer} that tests GenericRecord.
+ */
+public class AvroSerializerGenericRecordTest extends SerializerTestBase<GenericRecord> {
+
+ private static final Schema SCHEMA = new org.apache.avro.Schema.Parser()
+ .parse("{\"type\":\"record\",\"name\":\"Dummy\",\"namespace\":\"dummy\",\"fields\": "
+ + "[{\"name\":\"afield\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}");
+
+ @Override
+ protected TypeSerializer<GenericRecord> createSerializer() {
+ return new AvroSerializer<>(GenericRecord.class, SCHEMA);
+ }
+
+ @Override
+ protected int getLength() {
+ return -1;
+ }
+
+ @Override
+ protected Class<GenericRecord> getTypeClass() {
+ return GenericRecord.class;
+ }
+
+ @Override
+ protected GenericRecord[] getTestData() {
+ return new GenericRecord[]{
+ new GenericRecordBuilder(SCHEMA)
+ .set("afield", "foo bar")
+ .build()};
+ }
+}