You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/05/23 14:33:42 UTC

[1/2] flink git commit: [FLINK-9338] Implemented RegistryAvroDeserializationSchema & provided implementation for Confluent Schema Registry

Repository: flink
Updated Branches:
  refs/heads/master c10e03ff2 -> fed284b14


[FLINK-9338] Implemented RegistryAvroDeserializationSchema & provided implementation for Confluent Schema Registry

This closes #5995


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fed284b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fed284b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fed284b1

Branch: refs/heads/master
Commit: fed284b140459b27e25e651650448eae1d110f0f
Parents: fd101be
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri May 11 18:57:26 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed May 23 16:31:42 2018 +0200

----------------------------------------------------------------------
 .../flink-avro-confluent-registry/pom.xml       |  94 +++++++++++++
 ...fluentRegistryAvroDeserializationSchema.java | 136 +++++++++++++++++++
 .../confluent/ConfluentSchemaRegistryCoder.java |  67 +++++++++
 .../ConfluentSchemaRegistryCoderTest.java       |  80 +++++++++++
 .../avro/RegistryAvroDeserializationSchema.java |  84 ++++++++++++
 .../apache/flink/formats/avro/SchemaCoder.java  |  48 +++++++
 .../RegistryAvroDeserializationSchemaTest.java  | 103 ++++++++++++++
 flink-formats/pom.xml                           |   1 +
 8 files changed, 613 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml
new file mode 100644
index 0000000..b421ebf
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-formats</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.6-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-avro-confluent-registry</artifactId>
+
+	<repositories>
+		<repository>
+			<id>confluent</id>
+			<url>http://packages.confluent.io/maven/</url>
+		</repository>
+	</repositories>
+
+	<dependencies>
+		<dependency>
+			<groupId>io.confluent</groupId>
+			<artifactId>kafka-schema-registry-client</artifactId>
+			<version>3.3.1</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<relocations combine.children="append">
+								<relocation>
+									<pattern>com.fasterxml.jackson.core</pattern>
+									<shadedPattern>org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
new file mode 100644
index 0000000..1135bb9
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param <T> type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema<T> extends RegistryAvroDeserializationSchema<T> {
+
+	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+	private static final long serialVersionUID = -1671641202177852775L;
+
+	/**
+	 * Creates a Avro deserialization schema.
+	 *
+	 * @param recordClazz         class to which deserialize. Should be either
+	 *                            {@link SpecificRecord} or {@link GenericRecord}.
+	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
+	 *                            {@link GenericRecord}
+	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
+	 */
+	private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
+			SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+		super(recordClazz, reader, schemaCoderProvider);
+	}
+
+	/**
+	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
+	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
+	 *
+	 * @param schema schema of produced records
+	 * @param url    url of schema registry to connect
+	 * @return deserialized record in form of {@link GenericRecord}
+	 */
+	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url) {
+		return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+	}
+
+	/**
+	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
+	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
+	 *
+	 * @param schema              schema of produced records
+	 * @param url                 url of schema registry to connect
+	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
+	 * @return deserialized record in form of {@link GenericRecord}
+	 */
+	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url,
+			int identityMapCapacity) {
+		return new ConfluentRegistryAvroDeserializationSchema<>(
+			GenericRecord.class,
+			schema,
+			new CachedSchemaCoderProvider(url, identityMapCapacity));
+	}
+
+	/**
+	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
+	 * schema and looks up writer schema in Confluent Schema Registry.
+	 *
+	 * @param tClass class of record to be produced
+	 * @param url    url of schema registry to connect
+	 * @return deserialized record
+	 */
+	public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
+			String url) {
+		return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+	}
+
+	/**
+	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
+	 * schema and looks up writer schema in Confluent Schema Registry.
+	 *
+	 * @param tClass              class of record to be produced
+	 * @param url                 url of schema registry to connect
+	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
+	 * @return deserialized record
+	 */
+	public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
+			String url, int identityMapCapacity) {
+		return new ConfluentRegistryAvroDeserializationSchema<>(
+			tClass,
+			null,
+			new CachedSchemaCoderProvider(url, identityMapCapacity)
+		);
+	}
+
+	private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
+
+		private static final long serialVersionUID = 4023134423033312666L;
+		private final String url;
+		private final int identityMapCapacity;
+
+		CachedSchemaCoderProvider(String url, int identityMapCapacity) {
+			this.url = url;
+			this.identityMapCapacity = identityMapCapacity;
+		}
+
+		@Override
+		public SchemaCoder get() {
+			return new ConfluentSchemaRegistryCoder(new CachedSchemaRegistryClient(
+				url,
+				identityMapCapacity));
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
new file mode 100644
index 0000000..1f2dc69
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import org.apache.avro.Schema;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static java.lang.String.format;
+
+/**
+ * Reads schema using Confluent Schema Registry protocol.
+ */
+public class ConfluentSchemaRegistryCoder implements SchemaCoder {
+
+	private final SchemaRegistryClient schemaRegistryClient;
+
+	/**
+	 * Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
+	 * schema registry.
+	 *
+	 * @param schemaRegistryClient client to connect schema registry
+	 */
+	public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
+		this.schemaRegistryClient = schemaRegistryClient;
+	}
+
+	@Override
+	public Schema readSchema(InputStream in) throws IOException {
+		DataInputStream dataInputStream = new DataInputStream(in);
+
+		if (dataInputStream.readByte() != 0) {
+			throw new IOException("Unknown data format. Magic number does not match");
+		} else {
+			int schemaId = dataInputStream.readInt();
+
+			try {
+				return schemaRegistryClient.getById(schemaId);
+			} catch (RestClientException e) {
+				throw new IOException(format("Could not find schema with id %s in registry", schemaId), e);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
new file mode 100644
index 0000000..01e807c
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ConfluentSchemaRegistryCoder}.
+ */
+public class ConfluentSchemaRegistryCoderTest {
+
+	@Test
+	public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+		MockSchemaRegistryClient client = new MockSchemaRegistryClient();
+
+		Schema schema = SchemaBuilder.record("testRecord")
+			.fields()
+			.optionalString("testField")
+			.endRecord();
+		int schemaId = client.register("testTopic", schema);
+
+		ConfluentSchemaRegistryCoder registryCoder = new ConfluentSchemaRegistryCoder(client);
+		ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+		DataOutputStream dataOutputStream = new DataOutputStream(byteOutStream);
+		dataOutputStream.writeByte(0);
+		dataOutputStream.writeInt(schemaId);
+		dataOutputStream.flush();
+
+		ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+		Schema readSchema = registryCoder.readSchema(byteInStream);
+
+		assertEquals(schema, readSchema);
+		assertEquals(0, byteInStream.available());
+	}
+
+	@Test(expected = IOException.class)
+	public void testMagicByteVerification() throws Exception {
+		MockSchemaRegistryClient client = new MockSchemaRegistryClient();
+		int schemaId = client.register("testTopic", Schema.create(Schema.Type.BOOLEAN));
+
+		ConfluentSchemaRegistryCoder coder = new ConfluentSchemaRegistryCoder(client);
+		ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+		DataOutputStream dataOutputStream = new DataOutputStream(byteOutStream);
+		dataOutputStream.writeByte(5);
+		dataOutputStream.writeInt(schemaId);
+		dataOutputStream.flush();
+
+		ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+		coder.readSchema(byteInStream);
+
+		// exception is thrown
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
new file mode 100644
index 0000000..f145a75
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}.
+ *
+ * @param <T> type of record it produces
+ */
+public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
+
+	private static final long serialVersionUID = -884738268437806062L;
+
+	/** Provider for schema coder. Used for initializing in each task. */
+	private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+	/** Coder used for reading schema from incoming stream. */
+	private transient SchemaCoder schemaCoder;
+
+	/**
+	 * Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}.
+	 *
+	 * @param recordClazz         class to which deserialize. Should be either
+	 *                            {@link SpecificRecord} or {@link GenericRecord}.
+	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
+	 *                            {@link GenericRecord}
+	 * @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for
+	 *                            schema reading
+	 */
+	protected RegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
+			SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+		super(recordClazz, reader);
+		this.schemaCoderProvider = schemaCoderProvider;
+		this.schemaCoder = schemaCoderProvider.get();
+	}
+
+	@Override
+	public T deserialize(byte[] message) throws IOException {
+			checkAvroInitialized();
+			getInputStream().setBuffer(message);
+			Schema writerSchema = schemaCoder.readSchema(getInputStream());
+			Schema readerSchema = getReaderSchema();
+
+			GenericDatumReader<T> datumReader = getDatumReader();
+
+			datumReader.setSchema(writerSchema);
+			datumReader.setExpected(readerSchema);
+
+			return datumReader.read(null, getDecoder());
+	}
+
+	@Override
+	void checkAvroInitialized() {
+		super.checkAvroInitialized();
+		if (schemaCoder == null) {
+			this.schemaCoder = schemaCoderProvider.get();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
new file mode 100644
index 0000000..e194f77
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * Schema coder that allows reading schema that is somehow embedded into serialized record.
+ * Used by {@link RegistryAvroDeserializationSchema}.
+ */
+public interface SchemaCoder {
+	Schema readSchema(InputStream in) throws IOException;
+
+	/**
+	 * Provider for {@link SchemaCoder}. It allows creating multiple instances of client in
+	 * parallel operators without serializing it.
+	 */
+	interface SchemaCoderProvider extends Serializable {
+
+		/**
+		 * Creates a new instance of {@link SchemaCoder}. Each time it should create a new
+		 * instance, as it will be called on multiple nodes.
+		 *
+		 * @return new instance {@link SchemaCoder}
+		 */
+		SchemaCoder get();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
new file mode 100644
index 0000000..4c14d53
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.SimpleRecord;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests for {@link RegistryAvroDeserializationSchema}.
+ */
+public class RegistryAvroDeserializationSchemaTest {
+
+	private static final Address address = TestDataGenerator.generateRandomAddress(new Random());
+
+	@Test
+	public void testGenericRecordReadWithCompatibleSchema() throws IOException {
+		RegistryAvroDeserializationSchema<GenericRecord> deserializer = new RegistryAvroDeserializationSchema<>(
+			GenericRecord.class,
+			SchemaBuilder.record("Address")
+				.fields()
+				.requiredString("street")
+				.requiredInt("num")
+				.optionalString("country")
+				.endRecord(),
+			() -> new SchemaCoder() {
+				@Override
+				public Schema readSchema(InputStream in) {
+					return Address.getClassSchema();
+				}
+			}
+		);
+
+		GenericRecord genericRecord = deserializer.deserialize(writeRecord(
+			address,
+			Address.getClassSchema()));
+		assertEquals(address.getNum(), genericRecord.get("num"));
+		assertEquals(address.getStreet(), genericRecord.get("street").toString());
+		assertNull(genericRecord.get("city"));
+		assertNull(genericRecord.get("state"));
+		assertNull(genericRecord.get("zip"));
+		assertNull(genericRecord.get("country"));
+	}
+
+	@Test
+	public void testSpecificRecordReadMoreFieldsThanWereWritten() throws IOException {
+		Schema smallerUserSchema = new Schema.Parser().parse(
+			"{\"namespace\": \"org.apache.flink.formats.avro.generated\",\n" +
+				" \"type\": \"record\",\n" +
+				" \"name\": \"SimpleRecord\",\n" +
+				" \"fields\": [\n" +
+				"     {\"name\": \"name\", \"type\": \"string\"}" +
+				" ]\n" +
+				"}]");
+		RegistryAvroDeserializationSchema<SimpleRecord> deserializer = new RegistryAvroDeserializationSchema<>(
+			SimpleRecord.class,
+			null,
+			() -> in -> smallerUserSchema
+		);
+
+		GenericData.Record smallUser = new GenericRecordBuilder(smallerUserSchema)
+			.set("name", "someName")
+			.build();
+
+		SimpleRecord simpleRecord = deserializer.deserialize(writeRecord(
+			smallUser,
+			smallerUserSchema));
+
+		assertEquals("someName", simpleRecord.getName().toString());
+		assertNull(simpleRecord.getOptionalField());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 7cb67e8..2fb2c67 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -38,6 +38,7 @@ under the License.
 	<modules>
 		<module>flink-avro</module>
 		<module>flink-json</module>
+		<module>flink-avro-confluent-registry</module>
 	</modules>
 
 	<!-- override these root dependencies as 'provided', so they don't end up


[2/2] flink git commit: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dw...@apache.org.
[FLINK-9337] Implemented AvroDeserializationSchema


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd101be1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd101be1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd101be1

Branch: refs/heads/master
Commit: fd101be18cb6f386196e19178ba4da7f1f473fc5
Parents: c10e03f
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri May 11 18:55:10 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed May 23 16:31:42 2018 +0200

----------------------------------------------------------------------
 .../formats/avro/AvroDeserializationSchema.java | 172 +++++++++++++++++++
 .../avro/AvroRowDeserializationSchema.java      |  25 +--
 .../formats/avro/typeutils/AvroSerializer.java  |  77 +++++++--
 .../typeutils/GenericRecordAvroTypeInfo.java    | 115 +++++++++++++
 .../avro/utils/MutableByteArrayInputStream.java |  45 +++++
 .../avro/AvroDeserializationSchemaTest.java     |  62 +++++++
 .../BackwardsCompatibleAvroSerializerTest.java  |   6 +-
 .../flink/formats/avro/utils/AvroTestUtils.java |  21 +++
 .../formats/avro/utils/TestDataGenerator.java   |   2 +-
 .../src/test/resources/avro/user.avsc           |   8 +
 .../flink-1.3-avro-type-serialized-data         | Bin 23926 -> 23829 bytes
 .../flink-1.3-avro-type-serializer-snapshot     | Bin 48089 -> 33772 bytes
 12 files changed, 490 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
new file mode 100644
index 0000000..2d06476
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param <T> type of record it produces
+ */
+public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
+
+	/**
+	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
+	 *
+	 * @param schema schema of produced records
+	 * @return deserialized record in form of {@link GenericRecord}
+	 */
+	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
+		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
+	}
+
+	/**
+	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
+	 *
+	 * @param tClass class of record to be produced
+	 * @return deserialized record
+	 */
+	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
+		return new AvroDeserializationSchema<>(tClass, null);
+	}
+
+	private static final long serialVersionUID = -6766681879020862312L;
+
+	/** Class to deserialize to. */
+	private final Class<T> recordClazz;
+
+	/** Schema in case of GenericRecord for serialization purpose. */
+	private final String schemaString;
+
+	/** Reader that deserializes byte array into a record. */
+	private transient GenericDatumReader<T> datumReader;
+
+	/** Input stream to read message from. */
+	private transient MutableByteArrayInputStream inputStream;
+
+	/** Avro decoder that decodes binary data. */
+	private transient Decoder decoder;
+
+	/** Avro schema for the reader. */
+	private transient Schema reader;
+
+	/**
+	 * Creates a Avro deserialization schema.
+	 *
+	 * @param recordClazz class to which deserialize. Should be one of:
+	 *                    {@link org.apache.avro.specific.SpecificRecord},
+	 *                    {@link org.apache.avro.generic.GenericRecord}.
+	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
+	 *                    {@link GenericRecord}
+	 */
+	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
+		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+		this.recordClazz = recordClazz;
+		this.reader = reader;
+		if (reader != null) {
+			this.schemaString = reader.toString();
+		} else {
+			this.schemaString = null;
+		}
+	}
+
+	GenericDatumReader<T> getDatumReader() {
+		return datumReader;
+	}
+
+	Schema getReaderSchema() {
+		return reader;
+	}
+
+	MutableByteArrayInputStream getInputStream() {
+		return inputStream;
+	}
+
+	Decoder getDecoder() {
+		return decoder;
+	}
+
+	@Override
+	public T deserialize(byte[] message) throws IOException {
+		// read record
+		checkAvroInitialized();
+		inputStream.setBuffer(message);
+		Schema readerSchema = getReaderSchema();
+		GenericDatumReader<T> datumReader = getDatumReader();
+
+		datumReader.setSchema(readerSchema);
+
+		return datumReader.read(null, decoder);
+	}
+
+	void checkAvroInitialized() {
+		if (datumReader != null) {
+			return;
+		}
+
+		ClassLoader cl = Thread.currentThread().getContextClassLoader();
+		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
+			SpecificData specificData = new SpecificData(cl);
+			this.datumReader = new SpecificDatumReader<>(specificData);
+			this.reader = specificData.getSchema(recordClazz);
+		} else {
+			this.reader = new Schema.Parser().parse(schemaString);
+			GenericData genericData = new GenericData(cl);
+			this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
+		}
+
+		this.inputStream = new MutableByteArrayInputStream();
+		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+	}
+
+	@Override
+	public boolean isEndOfStream(T nextElement) {
+		return false;
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public TypeInformation<T> getProducedType() {
+		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
+			return new AvroTypeInfo(recordClazz, false);
+		} else {
+			return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 4a3c02e..a8422a4 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -18,6 +18,7 @@
 package org.apache.flink.formats.avro;
 
 import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
@@ -31,7 +32,6 @@ import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.avro.util.Utf8;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -153,27 +153,4 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
 		}
 	}
 
-	/**
-	 * An extension of the ByteArrayInputStream that allows to change a buffer that should be
-	 * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
-	 * InputStream instance, copying message to process, and creation of Decoder on every new message.
-	 */
-	private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
-
-		public MutableByteArrayInputStream() {
-			super(new byte[0]);
-		}
-
-		/**
-		 * Set buffer that can be read via the InputStream interface and reset the input stream.
-		 * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
-		 *
-		 * @param buf the new buffer to read.
-		 */
-		public void setBuffer(byte[] buf) {
-			this.buf = buf;
-			this.pos = 0;
-			this.count = buf.length;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 75f2988..b313625 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -32,6 +32,10 @@ import org.apache.avro.Schema;
 import org.apache.avro.SchemaCompatibility;
 import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
 import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
@@ -44,15 +48,19 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A serializer that serializes types via Avro.
  *
- * <p>The serializer supports both efficient specific record serialization for
- * types generated via Avro, as well as serialization via reflection
- * (ReflectDatumReader / -Writer). The serializer instantiates them depending on
- * the class of the type it should serialize.
+ * <p>The serializer supports:
+ * <ul>
+ * <li>efficient specific record serialization for types generated via Avro</li>
+ * <li>serialization via reflection (ReflectDatumReader / -Writer)</li>
+ * <li>serialization of generic records via GenericDatumReader / -Writer</li>
+ * </ul>
+ * The serializer instantiates them depending on the class of the type it should serialize.
  *
  * <p><b>Important:</b> This serializer is NOT THREAD SAFE, because it reuses the data encoders
  * and decoders which have buffers that would be shared between the threads if used concurrently
@@ -77,15 +85,17 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 	/** The class of the type that is serialized by this serializer. */
 	private final Class<T> type;
 
+	private final String schemaString;
+
 	// -------- runtime fields, non-serializable, lazily initialized -----------
 
-	private transient SpecificDatumWriter<T> writer;
-	private transient SpecificDatumReader<T> reader;
+	private transient GenericDatumWriter<T> writer;
+	private transient GenericDatumReader<T> reader;
 
 	private transient DataOutputEncoder encoder;
 	private transient DataInputDecoder decoder;
 
-	private transient SpecificData avroData;
+	private transient GenericData avroData;
 
 	private transient Schema schema;
 
@@ -99,9 +109,28 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 
 	/**
 	 * Creates a new AvroSerializer for the type indicated by the given class.
+	 * This constructor is intended to be used with {@link SpecificRecord} or reflection serializer.
+	 * For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)}
 	 */
 	public AvroSerializer(Class<T> type) {
+		checkArgument(!isGenericRecord(type),
+			"For GenericData.Record use constructor with explicit schema.");
+		this.type = checkNotNull(type);
+		this.schemaString = null;
+	}
+
+	/**
+	 * Creates a new AvroSerializer for the type indicated by the given class.
+	 * This constructor is expected to be used only with {@link GenericData.Record}.
+	 * For {@link SpecificRecord} or reflection serializer use
+	 * {@link AvroSerializer#AvroSerializer(Class)}
+	 */
+	public AvroSerializer(Class<T> type, Schema schema) {
+		checkArgument(isGenericRecord(type),
+			"For classes other than GenericData.Record use constructor without explicit schema.");
 		this.type = checkNotNull(type);
+		this.schema = checkNotNull(schema);
+		this.schemaString = schema.toString();
 	}
 
 	/**
@@ -275,9 +304,19 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
+	private static boolean isGenericRecord(Class<?> type) {
+		return !SpecificRecord.class.isAssignableFrom(type) &&
+			GenericRecord.class.isAssignableFrom(type);
+	}
+
 	@Override
 	public TypeSerializer<T> duplicate() {
-		return new AvroSerializer<>(type);
+		if (schemaString != null) {
+			return new AvroSerializer<>(type, schema);
+		} else {
+			return new AvroSerializer<>(type);
+
+		}
 	}
 
 	@Override
@@ -323,15 +362,23 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
 		final ClassLoader cl = Thread.currentThread().getContextClassLoader();
 
 		if (SpecificRecord.class.isAssignableFrom(type)) {
-			this.avroData = new SpecificData(cl);
-			this.schema = this.avroData.getSchema(type);
-			this.reader = new SpecificDatumReader<>(schema, schema, avroData);
-			this.writer = new SpecificDatumWriter<>(schema, avroData);
-		}
-		else {
+			SpecificData specificData = new SpecificData(cl);
+			this.avroData = specificData;
+			this.schema = specificData.getSchema(type);
+			this.reader = new SpecificDatumReader<>(schema, schema, specificData);
+			this.writer = new SpecificDatumWriter<>(schema, specificData);
+		} else if (GenericRecord.class.isAssignableFrom(type)) {
+			if (schema == null) {
+				this.schema = new Schema.Parser().parse(schemaString);
+			}
+			GenericData genericData = new GenericData(cl);
+			this.avroData = genericData;
+			this.reader = new GenericDatumReader<>(schema, schema, genericData);
+			this.writer = new GenericDatumWriter<>(schema, genericData);
+		} else {
 			final ReflectData reflectData = new ReflectData(cl);
 			this.avroData = reflectData;
-			this.schema = this.avroData.getSchema(type);
+			this.schema = reflectData.getSchema(type);
 			this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
 			this.writer = new ReflectDatumWriter<>(schema, reflectData);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
new file mode 100644
index 0000000..83d590a
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * TypeInformation for {@link GenericRecord}.
+ */
+public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
+
+	private static final long serialVersionUID = 4141977586453820650L;
+
+	private transient Schema schema;
+
+	public GenericRecordAvroTypeInfo(Schema schema) {
+		this.schema = checkNotNull(schema);
+	}
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 1;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return 1;
+	}
+
+	@Override
+	public Class<GenericRecord> getTypeClass() {
+		return GenericRecord.class;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<GenericRecord> createSerializer(ExecutionConfig config) {
+		return new AvroSerializer<>(GenericRecord.class, schema);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("GenericRecord(\"%s\")", schema.toString());
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof GenericRecordAvroTypeInfo) {
+			GenericRecordAvroTypeInfo avroTypeInfo = (GenericRecordAvroTypeInfo) obj;
+			return Objects.equals(avroTypeInfo.schema, schema);
+		} else {
+			return false;
+		}
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hashCode(schema);
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof GenericRecordAvroTypeInfo;
+	}
+
+	private void writeObject(ObjectOutputStream oos) throws IOException {
+		oos.writeUTF(schema.toString());
+	}
+
+	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+		this.schema = new Schema.Parser().parse(ois.readUTF());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java
new file mode 100644
index 0000000..dd42469
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * An extension of the ByteArrayInputStream that allows to change a buffer that should be
+ * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
+ * InputStream instance, copying message to process, and creation of Decoder on every new message.
+ */
+public final class MutableByteArrayInputStream extends ByteArrayInputStream {
+
+	public MutableByteArrayInputStream() {
+		super(new byte[0]);
+	}
+
+	/**
+	 * Set buffer that can be read via the InputStream interface and reset the input stream.
+	 * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
+	 *
+	 * @param buf the new buffer to read.
+	 */
+	public void setBuffer(byte[] buf) {
+		this.buf = buf;
+		this.pos = 0;
+		this.count = buf.length;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
new file mode 100644
index 0000000..8a15fbf
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AvroDeserializationSchema}.
+ */
+public class AvroDeserializationSchemaTest {
+
+	private static final Address address = TestDataGenerator.generateRandomAddress(new Random());
+
+	@Test
+	public void testGenericRecord() throws Exception {
+		DeserializationSchema<GenericRecord> deserializationSchema =
+			AvroDeserializationSchema.forGeneric(
+				address.getSchema()
+			);
+
+		byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
+		GenericRecord genericRecord = deserializationSchema.deserialize(encodedAddress);
+		assertEquals(address.getCity(), genericRecord.get("city").toString());
+		assertEquals(address.getNum(), genericRecord.get("num"));
+		assertEquals(address.getState(), genericRecord.get("state").toString());
+	}
+
+	@Test
+	public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+		DeserializationSchema<Address> deserializer = AvroDeserializationSchema.forSpecific(Address.class);
+
+		byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
+		Address deserializedAddress = deserializer.deserialize(encodedAddress);
+		assertEquals(address, deserializedAddress);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index 92395ba..f641636 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
 
 /**
  * This test ensures that state and state configuration created by Flink 1.3 Avro types
- * that used the PojoSerializer still works.
+ * that used the PojoSerializer still works (in most cases, see notice below).
  *
  * <p><b>Important:</b> Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3)
  * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already.
@@ -69,7 +69,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 	private static final int NUM_DATA_ENTRIES = 20;
 
 	@Test
-	public void testCompatibilityWithFlink_1_3() throws Exception {
+	public void testCompatibilityWithPojoSerializer() throws Exception {
 
 		// retrieve the old config snapshot
 
@@ -138,7 +138,7 @@ public class BackwardsCompatibleAvroSerializerTest {
 		}
 	}
 
-// run this code on a 1.3 (or earlier) branch to generate the test data
+// run this code to generate the test data
 //	public static void main(String[] args) throws Exception {
 //
 //		AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 90ac040..e6a2021 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -27,9 +27,15 @@ import org.apache.flink.types.Row;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.specific.SpecificRecord;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -149,4 +155,19 @@ public final class AvroTestUtils {
 
 		return t;
 	}
+
+	/**
+	 * Writes given record using specified schema.
+	 * @param record record to serialize
+	 * @param schema schema to use for serialization
+	 * @return serialized record
+	 */
+	public static byte[] writeRecord(GenericRecord record, Schema schema) throws IOException {
+		ByteArrayOutputStream stream = new ByteArrayOutputStream();
+		BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
+
+		new GenericDatumWriter<>(schema).write(record, encoder);
+		encoder.flush();
+		return stream.toByteArray();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index 9a9061e..9205627 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -97,7 +97,7 @@ public class TestDataGenerator {
 	private static String generateRandomString(Random rnd, int maxLen) {
 		char[] chars = new char[rnd.nextInt(maxLen + 1)];
 		for (int i = 0; i < chars.length; i++) {
-			chars[i] = (char) rnd.nextInt(Character.MAX_VALUE);
+			chars[i] = (char) rnd.nextInt(Character.MIN_SURROGATE);
 		}
 		return new String(chars);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 9685a15..f493d1f 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -32,4 +32,12 @@
      {"name": "type_union", "type": ["null", "boolean", "long", "double"]},
      {"name": "type_nested", "type": ["null", "Address"]}
  ]
+},
+ {"namespace": "org.apache.flink.formats.avro.generated",
+  "type": "record",
+  "name": "SimpleRecord",
+  "fields": [
+      {"name": "name", "type": "string"},
+      {"name": "optionalField",  "type": ["null", "int"], "default": null}
+  ]
 }]

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
index 028c1e6..42eaf5d 100644
Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data differ

http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
index 5bfdf728..0599305 100644
Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot differ