You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/05/17 13:35:14 UTC

[flink] branch master updated: [FLINK-17757][avro] Implement format factory for Avro serialization and deserialization schema of RowData type

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e2f2fd1  [FLINK-17757][avro] Implement format factory for Avro serialization and deserialization schema of RowData type
e2f2fd1 is described below

commit e2f2fd1aa9b25f4818886afd2263a8d1e7549e34
Author: yuzhao.cyz <yu...@gmail.com>
AuthorDate: Sat May 16 16:17:05 2020 +0800

    [FLINK-17757][avro] Implement format factory for Avro serialization and deserialization schema of RowData type
    
    This closes #12190
---
 .../flink/formats/avro/AvroFormatFactory.java      | 112 +++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  16 +++
 .../flink/formats/avro/AvroFormatFactoryTest.java  | 124 +++++++++++++++++++++
 3 files changed, 252 insertions(+)

diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
new file mode 100644
index 0000000..dbb2d7e
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.ScanFormat;
+import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * Table format factory for providing configured instances of Avro to RowData {@link SerializationSchema}
+ * and {@link DeserializationSchema}.
+ */
+public class AvroFormatFactory implements
+		DeserializationFormatFactory,
+		SerializationFormatFactory {
+
+	public static final String IDENTIFIER = "avro";
+
+	@Override
+	public ScanFormat<DeserializationSchema<RowData>> createScanFormat(
+			DynamicTableFactory.Context context,
+			ReadableConfig formatOptions) {
+		FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+		return new ScanFormat<DeserializationSchema<RowData>>() {
+			@Override
+			public DeserializationSchema<RowData> createScanFormat(
+					ScanTableSource.Context scanContext,
+					DataType producedDataType) {
+				final RowType rowType = (RowType) producedDataType.getLogicalType();
+				final TypeInformation<RowData> rowDataTypeInfo =
+						(TypeInformation<RowData>) scanContext.createTypeInformation(producedDataType);
+				return new AvroRowDataDeserializationSchema(rowType, rowDataTypeInfo);
+			}
+
+			@Override
+			public ChangelogMode getChangelogMode() {
+				return ChangelogMode.insertOnly();
+			}
+		};
+	}
+
+	@Override
+	public SinkFormat<SerializationSchema<RowData>> createSinkFormat(
+			DynamicTableFactory.Context context,
+			ReadableConfig formatOptions) {
+		FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+		return new SinkFormat<SerializationSchema<RowData>>() {
+			@Override
+			public SerializationSchema<RowData> createSinkFormat(
+					DynamicTableSink.Context context,
+					DataType consumedDataType) {
+				final RowType rowType = (RowType) consumedDataType.getLogicalType();
+				return new AvroRowDataSerializationSchema(rowType);
+			}
+
+			@Override
+			public ChangelogMode getChangelogMode() {
+				return ChangelogMode.insertOnly();
+			}
+		};
+	}
+
+	@Override
+	public String factoryIdentifier() {
+		return IDENTIFIER;
+	}
+
+	@Override
+	public Set<ConfigOption<?>> requiredOptions() {
+		return Collections.emptySet();
+	}
+
+	@Override
+	public Set<ConfigOption<?>> optionalOptions() {
+		return Collections.emptySet();
+	}
+}
diff --git a/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 0000000..e456efc
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.formats.avro.AvroFormatFactory
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
new file mode 100644
index 0000000..d1c1156
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFormatFactoryTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.TestDynamicTableFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link AvroFormatFactory}.
+ */
+public class AvroFormatFactoryTest extends TestLogger {
+
+	private static final TableSchema SCHEMA = TableSchema.builder()
+			.field("a", DataTypes.STRING())
+			.field("b", DataTypes.INT())
+			.field("c", DataTypes.BOOLEAN())
+			.build();
+
+	private static final RowType ROW_TYPE = (RowType) SCHEMA.toRowDataType().getLogicalType();
+
+	@Test
+	public void testSeDeSchema() {
+		final AvroRowDataDeserializationSchema expectedDeser =
+				new AvroRowDataDeserializationSchema(ROW_TYPE, new RowDataTypeInfo(ROW_TYPE));
+
+		final Map<String, String> options = getAllOptions();
+
+		final DynamicTableSource actualSource = createTableSource(options);
+		assert actualSource instanceof TestDynamicTableFactory.DynamicTableSourceMock;
+		TestDynamicTableFactory.DynamicTableSourceMock scanSourceMock =
+				(TestDynamicTableFactory.DynamicTableSourceMock) actualSource;
+
+		DeserializationSchema<RowData> actualDeser = scanSourceMock.sourceValueFormat
+				.createScanFormat(
+						ScanRuntimeProviderContext.INSTANCE,
+						SCHEMA.toRowDataType());
+
+		assertEquals(expectedDeser, actualDeser);
+
+		final AvroRowDataSerializationSchema expectedSer =
+				new AvroRowDataSerializationSchema(ROW_TYPE);
+
+		final DynamicTableSink actualSink = createTableSink(options);
+		assert actualSink instanceof TestDynamicTableFactory.DynamicTableSinkMock;
+		TestDynamicTableFactory.DynamicTableSinkMock sinkMock =
+				(TestDynamicTableFactory.DynamicTableSinkMock) actualSink;
+
+		SerializationSchema<RowData> actualSer = sinkMock.sinkValueFormat
+				.createSinkFormat(
+						null,
+						SCHEMA.toRowDataType());
+
+		assertEquals(expectedSer, actualSer);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private Map<String, String> getAllOptions() {
+		final Map<String, String> options = new HashMap<>();
+		options.put("connector", TestDynamicTableFactory.IDENTIFIER);
+		options.put("target", "MyTarget");
+		options.put("buffer-size", "1000");
+
+		options.put("format", AvroFormatFactory.IDENTIFIER);
+		return options;
+	}
+
+	private static DynamicTableSource createTableSource(Map<String, String> options) {
+		return FactoryUtil.createTableSource(
+				null,
+				ObjectIdentifier.of("default", "default", "t1"),
+				new CatalogTableImpl(SCHEMA, options, "mock source"),
+				new Configuration(),
+				AvroFormatFactoryTest.class.getClassLoader());
+	}
+
+	private static DynamicTableSink createTableSink(Map<String, String> options) {
+		return FactoryUtil.createTableSink(
+				null,
+				ObjectIdentifier.of("default", "default", "t1"),
+				new CatalogTableImpl(SCHEMA, options, "mock sink"),
+				new Configuration(),
+				AvroFormatFactoryTest.class.getClassLoader());
+	}
+}