You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/05/23 14:33:42 UTC
[1/2] flink git commit: [FLINK-9338] Implemented
RegistryAvroDeserializationSchema & provided implementation for Confluent
Schema Registry
Repository: flink
Updated Branches:
refs/heads/master c10e03ff2 -> fed284b14
[FLINK-9338] Implemented RegistryAvroDeserializationSchema & provided implementation for Confluent Schema Registry
This closes #5995
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fed284b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fed284b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fed284b1
Branch: refs/heads/master
Commit: fed284b140459b27e25e651650448eae1d110f0f
Parents: fd101be
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri May 11 18:57:26 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed May 23 16:31:42 2018 +0200
----------------------------------------------------------------------
.../flink-avro-confluent-registry/pom.xml | 94 +++++++++++++
...fluentRegistryAvroDeserializationSchema.java | 136 +++++++++++++++++++
.../confluent/ConfluentSchemaRegistryCoder.java | 67 +++++++++
.../ConfluentSchemaRegistryCoderTest.java | 80 +++++++++++
.../avro/RegistryAvroDeserializationSchema.java | 84 ++++++++++++
.../apache/flink/formats/avro/SchemaCoder.java | 48 +++++++
.../RegistryAvroDeserializationSchemaTest.java | 103 ++++++++++++++
flink-formats/pom.xml | 1 +
8 files changed, 613 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml
new file mode 100644
index 0000000..b421ebf
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-formats</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.6-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-avro-confluent-registry</artifactId>
+
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>http://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-schema-registry-client</artifactId>
+ <version>3.3.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-avro</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-flink</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations combine.children="append">
+ <relocation>
+ <pattern>com.fasterxml.jackson.core</pattern>
+ <shadedPattern>org.apache.flink.formats.avro.registry.confluent.shaded.com.fasterxml.jackson.core</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
new file mode 100644
index 0000000..1135bb9
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.AvroDeserializationSchema;
+import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
+ * Confluent Schema Registry.
+ *
+ * @param <T> type of record it produces
+ */
+public class ConfluentRegistryAvroDeserializationSchema<T> extends RegistryAvroDeserializationSchema<T> {
+
+ private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
+
+ private static final long serialVersionUID = -1671641202177852775L;
+
+ /**
+ * Creates a Avro deserialization schema.
+ *
+ * @param recordClazz class to which deserialize. Should be either
+ * {@link SpecificRecord} or {@link GenericRecord}.
+ * @param reader reader's Avro schema. Should be provided if recordClazz is
+ * {@link GenericRecord}
+ * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
+ */
+ private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
+ SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+ super(recordClazz, reader, schemaCoderProvider);
+ }
+
+ /**
+ * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
+ * using provided reader schema and looks up writer schema in Confluent Schema Registry.
+ *
+ * @param schema schema of produced records
+ * @param url url of schema registry to connect
+ * @return deserialized record in form of {@link GenericRecord}
+ */
+ public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url) {
+ return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+ }
+
+ /**
+ * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
+ * using provided reader schema and looks up writer schema in Confluent Schema Registry.
+ *
+ * @param schema schema of produced records
+ * @param url url of schema registry to connect
+ * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
+ * @return deserialized record in form of {@link GenericRecord}
+ */
+ public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url,
+ int identityMapCapacity) {
+ return new ConfluentRegistryAvroDeserializationSchema<>(
+ GenericRecord.class,
+ schema,
+ new CachedSchemaCoderProvider(url, identityMapCapacity));
+ }
+
+ /**
+ * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
+ * schema and looks up writer schema in Confluent Schema Registry.
+ *
+ * @param tClass class of record to be produced
+ * @param url url of schema registry to connect
+ * @return deserialized record
+ */
+ public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
+ String url) {
+ return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY);
+ }
+
+ /**
+ * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
+ * schema and looks up writer schema in Confluent Schema Registry.
+ *
+ * @param tClass class of record to be produced
+ * @param url url of schema registry to connect
+ * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
+ * @return deserialized record
+ */
+ public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
+ String url, int identityMapCapacity) {
+ return new ConfluentRegistryAvroDeserializationSchema<>(
+ tClass,
+ null,
+ new CachedSchemaCoderProvider(url, identityMapCapacity)
+ );
+ }
+
+ private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
+
+ private static final long serialVersionUID = 4023134423033312666L;
+ private final String url;
+ private final int identityMapCapacity;
+
+ CachedSchemaCoderProvider(String url, int identityMapCapacity) {
+ this.url = url;
+ this.identityMapCapacity = identityMapCapacity;
+ }
+
+ @Override
+ public SchemaCoder get() {
+ return new ConfluentSchemaRegistryCoder(new CachedSchemaRegistryClient(
+ url,
+ identityMapCapacity));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
new file mode 100644
index 0000000..1f2dc69
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import org.apache.flink.formats.avro.SchemaCoder;
+
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import org.apache.avro.Schema;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static java.lang.String.format;
+
+/**
+ * Reads schema using Confluent Schema Registry protocol.
+ */
+public class ConfluentSchemaRegistryCoder implements SchemaCoder {
+
+ private final SchemaRegistryClient schemaRegistryClient;
+
+ /**
+ * Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
+ * schema registry.
+ *
+ * @param schemaRegistryClient client to connect schema registry
+ */
+ public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
+ this.schemaRegistryClient = schemaRegistryClient;
+ }
+
+ @Override
+ public Schema readSchema(InputStream in) throws IOException {
+ DataInputStream dataInputStream = new DataInputStream(in);
+
+ if (dataInputStream.readByte() != 0) {
+ throw new IOException("Unknown data format. Magic number does not match");
+ } else {
+ int schemaId = dataInputStream.readInt();
+
+ try {
+ return schemaRegistryClient.getById(schemaId);
+ } catch (RestClientException e) {
+ throw new IOException(format("Could not find schema with id %s in registry", schemaId), e);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
new file mode 100644
index 0000000..01e807c
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ConfluentSchemaRegistryCoder}.
+ */
+public class ConfluentSchemaRegistryCoderTest {
+
+ @Test
+ public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+ MockSchemaRegistryClient client = new MockSchemaRegistryClient();
+
+ Schema schema = SchemaBuilder.record("testRecord")
+ .fields()
+ .optionalString("testField")
+ .endRecord();
+ int schemaId = client.register("testTopic", schema);
+
+ ConfluentSchemaRegistryCoder registryCoder = new ConfluentSchemaRegistryCoder(client);
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteOutStream);
+ dataOutputStream.writeByte(0);
+ dataOutputStream.writeInt(schemaId);
+ dataOutputStream.flush();
+
+ ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+ Schema readSchema = registryCoder.readSchema(byteInStream);
+
+ assertEquals(schema, readSchema);
+ assertEquals(0, byteInStream.available());
+ }
+
+ @Test(expected = IOException.class)
+ public void testMagicByteVerification() throws Exception {
+ MockSchemaRegistryClient client = new MockSchemaRegistryClient();
+ int schemaId = client.register("testTopic", Schema.create(Schema.Type.BOOLEAN));
+
+ ConfluentSchemaRegistryCoder coder = new ConfluentSchemaRegistryCoder(client);
+ ByteArrayOutputStream byteOutStream = new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream = new DataOutputStream(byteOutStream);
+ dataOutputStream.writeByte(5);
+ dataOutputStream.writeInt(schemaId);
+ dataOutputStream.flush();
+
+ ByteArrayInputStream byteInStream = new ByteArrayInputStream(byteOutStream.toByteArray());
+ coder.readSchema(byteInStream);
+
+ // exception is thrown
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
new file mode 100644
index 0000000..f145a75
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}.
+ *
+ * @param <T> type of record it produces
+ */
+public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
+
+ private static final long serialVersionUID = -884738268437806062L;
+
+ /** Provider for schema coder. Used for initializing in each task. */
+ private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
+
+ /** Coder used for reading schema from incoming stream. */
+ private transient SchemaCoder schemaCoder;
+
+ /**
+ * Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}.
+ *
+ * @param recordClazz class to which deserialize. Should be either
+ * {@link SpecificRecord} or {@link GenericRecord}.
+ * @param reader reader's Avro schema. Should be provided if recordClazz is
+ * {@link GenericRecord}
+ * @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for
+ * schema reading
+ */
+ protected RegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
+ SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
+ super(recordClazz, reader);
+ this.schemaCoderProvider = schemaCoderProvider;
+ this.schemaCoder = schemaCoderProvider.get();
+ }
+
+ @Override
+ public T deserialize(byte[] message) throws IOException {
+ checkAvroInitialized();
+ getInputStream().setBuffer(message);
+ Schema writerSchema = schemaCoder.readSchema(getInputStream());
+ Schema readerSchema = getReaderSchema();
+
+ GenericDatumReader<T> datumReader = getDatumReader();
+
+ datumReader.setSchema(writerSchema);
+ datumReader.setExpected(readerSchema);
+
+ return datumReader.read(null, getDecoder());
+ }
+
+ @Override
+ void checkAvroInitialized() {
+ super.checkAvroInitialized();
+ if (schemaCoder == null) {
+ this.schemaCoder = schemaCoderProvider.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
new file mode 100644
index 0000000..e194f77
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+
+/**
+ * Schema coder that allows reading schema that is somehow embedded into serialized record.
+ * Used by {@link RegistryAvroDeserializationSchema}.
+ */
+public interface SchemaCoder {
+ Schema readSchema(InputStream in) throws IOException;
+
+ /**
+ * Provider for {@link SchemaCoder}. It allows creating multiple instances of client in
+ * parallel operators without serializing it.
+ */
+ interface SchemaCoderProvider extends Serializable {
+
+ /**
+ * Creates a new instance of {@link SchemaCoder}. Each time it should create a new
+ * instance, as it will be called on multiple nodes.
+ *
+ * @return new instance {@link SchemaCoder}
+ */
+ SchemaCoder get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
new file mode 100644
index 0000000..4c14d53
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchemaTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.generated.SimpleRecord;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests for {@link RegistryAvroDeserializationSchema}.
+ */
+public class RegistryAvroDeserializationSchemaTest {
+
+ private static final Address address = TestDataGenerator.generateRandomAddress(new Random());
+
+ @Test
+ public void testGenericRecordReadWithCompatibleSchema() throws IOException {
+ RegistryAvroDeserializationSchema<GenericRecord> deserializer = new RegistryAvroDeserializationSchema<>(
+ GenericRecord.class,
+ SchemaBuilder.record("Address")
+ .fields()
+ .requiredString("street")
+ .requiredInt("num")
+ .optionalString("country")
+ .endRecord(),
+ () -> new SchemaCoder() {
+ @Override
+ public Schema readSchema(InputStream in) {
+ return Address.getClassSchema();
+ }
+ }
+ );
+
+ GenericRecord genericRecord = deserializer.deserialize(writeRecord(
+ address,
+ Address.getClassSchema()));
+ assertEquals(address.getNum(), genericRecord.get("num"));
+ assertEquals(address.getStreet(), genericRecord.get("street").toString());
+ assertNull(genericRecord.get("city"));
+ assertNull(genericRecord.get("state"));
+ assertNull(genericRecord.get("zip"));
+ assertNull(genericRecord.get("country"));
+ }
+
+ @Test
+ public void testSpecificRecordReadMoreFieldsThanWereWritten() throws IOException {
+ Schema smallerUserSchema = new Schema.Parser().parse(
+ "{\"namespace\": \"org.apache.flink.formats.avro.generated\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"name\": \"SimpleRecord\",\n" +
+ " \"fields\": [\n" +
+ " {\"name\": \"name\", \"type\": \"string\"}" +
+ " ]\n" +
+ "}]");
+ RegistryAvroDeserializationSchema<SimpleRecord> deserializer = new RegistryAvroDeserializationSchema<>(
+ SimpleRecord.class,
+ null,
+ () -> in -> smallerUserSchema
+ );
+
+ GenericData.Record smallUser = new GenericRecordBuilder(smallerUserSchema)
+ .set("name", "someName")
+ .build();
+
+ SimpleRecord simpleRecord = deserializer.deserialize(writeRecord(
+ smallUser,
+ smallerUserSchema));
+
+ assertEquals("someName", simpleRecord.getName().toString());
+ assertNull(simpleRecord.getOptionalField());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fed284b1/flink-formats/pom.xml
----------------------------------------------------------------------
diff --git a/flink-formats/pom.xml b/flink-formats/pom.xml
index 7cb67e8..2fb2c67 100644
--- a/flink-formats/pom.xml
+++ b/flink-formats/pom.xml
@@ -38,6 +38,7 @@ under the License.
<modules>
<module>flink-avro</module>
<module>flink-json</module>
+ <module>flink-avro-confluent-registry</module>
</modules>
<!-- override these root dependencies as 'provided', so they don't end up
[2/2] flink git commit: [FLINK-9337] Implemented
AvroDeserializationSchema
Posted by dw...@apache.org.
[FLINK-9337] Implemented AvroDeserializationSchema
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd101be1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd101be1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd101be1
Branch: refs/heads/master
Commit: fd101be18cb6f386196e19178ba4da7f1f473fc5
Parents: c10e03f
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Fri May 11 18:55:10 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed May 23 16:31:42 2018 +0200
----------------------------------------------------------------------
.../formats/avro/AvroDeserializationSchema.java | 172 +++++++++++++++++++
.../avro/AvroRowDeserializationSchema.java | 25 +--
.../formats/avro/typeutils/AvroSerializer.java | 77 +++++++--
.../typeutils/GenericRecordAvroTypeInfo.java | 115 +++++++++++++
.../avro/utils/MutableByteArrayInputStream.java | 45 +++++
.../avro/AvroDeserializationSchemaTest.java | 62 +++++++
.../BackwardsCompatibleAvroSerializerTest.java | 6 +-
.../flink/formats/avro/utils/AvroTestUtils.java | 21 +++
.../formats/avro/utils/TestDataGenerator.java | 2 +-
.../src/test/resources/avro/user.avsc | 8 +
.../flink-1.3-avro-type-serialized-data | Bin 23926 -> 23829 bytes
.../flink-1.3-avro-type-serializer-snapshot | Bin 48089 -> 33772 bytes
12 files changed, 490 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
new file mode 100644
index 0000000..2d06476
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
+import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.Decoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificRecord;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/**
+ * Deserialization schema that deserializes from Avro binary format.
+ *
+ * @param <T> type of record it produces
+ */
+public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
+
+ /**
+ * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
+ *
+ * @param schema schema of produced records
+ * @return deserialized record in form of {@link GenericRecord}
+ */
+ public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
+ return new AvroDeserializationSchema<>(GenericRecord.class, schema);
+ }
+
+ /**
+ * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
+ *
+ * @param tClass class of record to be produced
+ * @return deserialized record
+ */
+ public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
+ return new AvroDeserializationSchema<>(tClass, null);
+ }
+
+ private static final long serialVersionUID = -6766681879020862312L;
+
+ /** Class to deserialize to. */
+ private final Class<T> recordClazz;
+
+ /** Schema in case of GenericRecord for serialization purpose. */
+ private final String schemaString;
+
+ /** Reader that deserializes byte array into a record. */
+ private transient GenericDatumReader<T> datumReader;
+
+ /** Input stream to read message from. */
+ private transient MutableByteArrayInputStream inputStream;
+
+ /** Avro decoder that decodes binary data. */
+ private transient Decoder decoder;
+
+ /** Avro schema for the reader. */
+ private transient Schema reader;
+
+ /**
+ * Creates a Avro deserialization schema.
+ *
+ * @param recordClazz class to which deserialize. Should be one of:
+ * {@link org.apache.avro.specific.SpecificRecord},
+ * {@link org.apache.avro.generic.GenericRecord}.
+ * @param reader reader's Avro schema. Should be provided if recordClazz is
+ * {@link GenericRecord}
+ */
+ AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
+ Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
+ this.recordClazz = recordClazz;
+ this.reader = reader;
+ if (reader != null) {
+ this.schemaString = reader.toString();
+ } else {
+ this.schemaString = null;
+ }
+ }
+
+ GenericDatumReader<T> getDatumReader() {
+ return datumReader;
+ }
+
+ Schema getReaderSchema() {
+ return reader;
+ }
+
+ MutableByteArrayInputStream getInputStream() {
+ return inputStream;
+ }
+
+ Decoder getDecoder() {
+ return decoder;
+ }
+
+ @Override
+ public T deserialize(byte[] message) throws IOException {
+ // read record
+ checkAvroInitialized();
+ inputStream.setBuffer(message);
+ Schema readerSchema = getReaderSchema();
+ GenericDatumReader<T> datumReader = getDatumReader();
+
+ datumReader.setSchema(readerSchema);
+
+ return datumReader.read(null, decoder);
+ }
+
+ void checkAvroInitialized() {
+ if (datumReader != null) {
+ return;
+ }
+
+ ClassLoader cl = Thread.currentThread().getContextClassLoader();
+ if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
+ SpecificData specificData = new SpecificData(cl);
+ this.datumReader = new SpecificDatumReader<>(specificData);
+ this.reader = specificData.getSchema(recordClazz);
+ } else {
+ this.reader = new Schema.Parser().parse(schemaString);
+ GenericData genericData = new GenericData(cl);
+ this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
+ }
+
+ this.inputStream = new MutableByteArrayInputStream();
+ this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
+ }
+
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return false;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public TypeInformation<T> getProducedType() {
+ if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
+ return new AvroTypeInfo(recordClazz, false);
+ } else {
+ return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
index 4a3c02e..a8422a4 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowDeserializationSchema.java
@@ -18,6 +18,7 @@
package org.apache.flink.formats.avro;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
+import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -31,7 +32,6 @@ import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -153,27 +153,4 @@ public class AvroRowDeserializationSchema extends AbstractDeserializationSchema<
}
}
- /**
- * An extension of the ByteArrayInputStream that allows to change a buffer that should be
- * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
- * InputStream instance, copying message to process, and creation of Decoder on every new message.
- */
- private static final class MutableByteArrayInputStream extends ByteArrayInputStream {
-
- public MutableByteArrayInputStream() {
- super(new byte[0]);
- }
-
- /**
- * Set buffer that can be read via the InputStream interface and reset the input stream.
- * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
- *
- * @param buf the new buffer to read.
- */
- public void setBuffer(byte[] buf) {
- this.buf = buf;
- this.pos = 0;
- this.count = buf.length;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
index 75f2988..b313625 100644
--- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java
@@ -32,6 +32,10 @@ import org.apache.avro.Schema;
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
@@ -44,15 +48,19 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A serializer that serializes types via Avro.
*
- * <p>The serializer supports both efficient specific record serialization for
- * types generated via Avro, as well as serialization via reflection
- * (ReflectDatumReader / -Writer). The serializer instantiates them depending on
- * the class of the type it should serialize.
+ * <p>The serializer supports:
+ * <ul>
+ * <li>efficient specific record serialization for types generated via Avro</li>
+ * <li>serialization via reflection (ReflectDatumReader / -Writer)</li>
+ * <li>serialization of generic records via GenericDatumReader / -Writer</li>
+ * </ul>
+ * The serializer instantiates them depending on the class of the type it should serialize.
*
* <p><b>Important:</b> This serializer is NOT THREAD SAFE, because it reuses the data encoders
* and decoders which have buffers that would be shared between the threads if used concurrently
@@ -77,15 +85,17 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
/** The class of the type that is serialized by this serializer. */
private final Class<T> type;
+ private final String schemaString;
+
// -------- runtime fields, non-serializable, lazily initialized -----------
- private transient SpecificDatumWriter<T> writer;
- private transient SpecificDatumReader<T> reader;
+ private transient GenericDatumWriter<T> writer;
+ private transient GenericDatumReader<T> reader;
private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;
- private transient SpecificData avroData;
+ private transient GenericData avroData;
private transient Schema schema;
@@ -99,9 +109,28 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
/**
* Creates a new AvroSerializer for the type indicated by the given class.
+ * This constructor is intended to be used with {@link SpecificRecord} or reflection serializer.
+ * For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)}
*/
public AvroSerializer(Class<T> type) {
+ checkArgument(!isGenericRecord(type),
+ "For GenericData.Record use constructor with explicit schema.");
+ this.type = checkNotNull(type);
+ this.schemaString = null;
+ }
+
+ /**
+ * Creates a new AvroSerializer for the type indicated by the given class.
+ * This constructor is expected to be used only with {@link GenericData.Record}.
+ * For {@link SpecificRecord} or reflection serializer use
+ * {@link AvroSerializer#AvroSerializer(Class)}
+ */
+ public AvroSerializer(Class<T> type, Schema schema) {
+ checkArgument(isGenericRecord(type),
+ "For classes other than GenericData.Record use constructor without explicit schema.");
this.type = checkNotNull(type);
+ this.schema = checkNotNull(schema);
+ this.schemaString = schema.toString();
}
/**
@@ -275,9 +304,19 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
// Utilities
// ------------------------------------------------------------------------
+ private static boolean isGenericRecord(Class<?> type) {
+ return !SpecificRecord.class.isAssignableFrom(type) &&
+ GenericRecord.class.isAssignableFrom(type);
+ }
+
@Override
public TypeSerializer<T> duplicate() {
- return new AvroSerializer<>(type);
+ if (schemaString != null) {
+ return new AvroSerializer<>(type, schema);
+ } else {
+ return new AvroSerializer<>(type);
+
+ }
}
@Override
@@ -323,15 +362,23 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
final ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(type)) {
- this.avroData = new SpecificData(cl);
- this.schema = this.avroData.getSchema(type);
- this.reader = new SpecificDatumReader<>(schema, schema, avroData);
- this.writer = new SpecificDatumWriter<>(schema, avroData);
- }
- else {
+ SpecificData specificData = new SpecificData(cl);
+ this.avroData = specificData;
+ this.schema = specificData.getSchema(type);
+ this.reader = new SpecificDatumReader<>(schema, schema, specificData);
+ this.writer = new SpecificDatumWriter<>(schema, specificData);
+ } else if (GenericRecord.class.isAssignableFrom(type)) {
+ if (schema == null) {
+ this.schema = new Schema.Parser().parse(schemaString);
+ }
+ GenericData genericData = new GenericData(cl);
+ this.avroData = genericData;
+ this.reader = new GenericDatumReader<>(schema, schema, genericData);
+ this.writer = new GenericDatumWriter<>(schema, genericData);
+ } else {
final ReflectData reflectData = new ReflectData(cl);
this.avroData = reflectData;
- this.schema = this.avroData.getSchema(type);
+ this.schema = reflectData.getSchema(type);
this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
new file mode 100644
index 0000000..83d590a
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/GenericRecordAvroTypeInfo.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.typeutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * TypeInformation for {@link GenericRecord}.
+ */
+public class GenericRecordAvroTypeInfo extends TypeInformation<GenericRecord> {
+
+ private static final long serialVersionUID = 4141977586453820650L;
+
+ private transient Schema schema;
+
+ public GenericRecordAvroTypeInfo(Schema schema) {
+ this.schema = checkNotNull(schema);
+ }
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<GenericRecord> getTypeClass() {
+ return GenericRecord.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<GenericRecord> createSerializer(ExecutionConfig config) {
+ return new AvroSerializer<>(GenericRecord.class, schema);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("GenericRecord(\"%s\")", schema.toString());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof GenericRecordAvroTypeInfo) {
+ GenericRecordAvroTypeInfo avroTypeInfo = (GenericRecordAvroTypeInfo) obj;
+ return Objects.equals(avroTypeInfo.schema, schema);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(schema);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof GenericRecordAvroTypeInfo;
+ }
+
+ private void writeObject(ObjectOutputStream oos) throws IOException {
+ oos.writeUTF(schema.toString());
+ }
+
+ private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
+ this.schema = new Schema.Parser().parse(ois.readUTF());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java
new file mode 100644
index 0000000..dd42469
--- /dev/null
+++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/utils/MutableByteArrayInputStream.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.utils;
+
+import java.io.ByteArrayInputStream;
+
+/**
+ * An extension of the ByteArrayInputStream that allows to change a buffer that should be
+ * read without creating a new ByteArrayInputStream instance. This allows to re-use the same
+ * InputStream instance, copying message to process, and creation of Decoder on every new message.
+ */
+public final class MutableByteArrayInputStream extends ByteArrayInputStream {
+
+ public MutableByteArrayInputStream() {
+ super(new byte[0]);
+ }
+
+ /**
+ * Set buffer that can be read via the InputStream interface and reset the input stream.
+ * This has the same effect as creating a new ByteArrayInputStream with a new buffer.
+ *
+ * @param buf the new buffer to read.
+ */
+ public void setBuffer(byte[] buf) {
+ this.buf = buf;
+ this.pos = 0;
+ this.count = buf.length;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
new file mode 100644
index 0000000..8a15fbf
--- /dev/null
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroDeserializationSchemaTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.formats.avro.generated.Address;
+import org.apache.flink.formats.avro.utils.TestDataGenerator;
+
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Test;
+
+import java.util.Random;
+
+import static org.apache.flink.formats.avro.utils.AvroTestUtils.writeRecord;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AvroDeserializationSchema}.
+ */
+public class AvroDeserializationSchemaTest {
+
+ private static final Address address = TestDataGenerator.generateRandomAddress(new Random());
+
+ @Test
+ public void testGenericRecord() throws Exception {
+ DeserializationSchema<GenericRecord> deserializationSchema =
+ AvroDeserializationSchema.forGeneric(
+ address.getSchema()
+ );
+
+ byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
+ GenericRecord genericRecord = deserializationSchema.deserialize(encodedAddress);
+ assertEquals(address.getCity(), genericRecord.get("city").toString());
+ assertEquals(address.getNum(), genericRecord.get("num"));
+ assertEquals(address.getState(), genericRecord.get("state").toString());
+ }
+
+ @Test
+ public void testSpecificRecordWithConfluentSchemaRegistry() throws Exception {
+ DeserializationSchema<Address> deserializer = AvroDeserializationSchema.forSpecific(Address.class);
+
+ byte[] encodedAddress = writeRecord(address, Address.getClassSchema());
+ Address deserializedAddress = deserializer.deserialize(encodedAddress);
+ assertEquals(address, deserializedAddress);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
index 92395ba..f641636 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/BackwardsCompatibleAvroSerializerTest.java
@@ -43,7 +43,7 @@ import static org.junit.Assert.assertTrue;
/**
* This test ensures that state and state configuration created by Flink 1.3 Avro types
- * that used the PojoSerializer still works.
+ * that used the PojoSerializer still works (in most cases, see notice below).
*
* <p><b>Important:</b> Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3)
* and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already.
@@ -69,7 +69,7 @@ public class BackwardsCompatibleAvroSerializerTest {
private static final int NUM_DATA_ENTRIES = 20;
@Test
- public void testCompatibilityWithFlink_1_3() throws Exception {
+ public void testCompatibilityWithPojoSerializer() throws Exception {
// retrieve the old config snapshot
@@ -138,7 +138,7 @@ public class BackwardsCompatibleAvroSerializerTest {
}
}
-// run this code on a 1.3 (or earlier) branch to generate the test data
+// run this code to generate the test data
// public static void main(String[] args) throws Exception {
//
// AvroTypeInfo<User> typeInfo = new AvroTypeInfo<>(User.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
index 90ac040..e6a2021 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/AvroTestUtils.java
@@ -27,9 +27,15 @@ import org.apache.flink.types.Row;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.specific.SpecificRecord;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -149,4 +155,19 @@ public final class AvroTestUtils {
return t;
}
+
+ /**
+ * Writes given record using specified schema.
+ * @param record record to serialize
+ * @param schema schema to use for serialization
+ * @return serialized record
+ */
+ public static byte[] writeRecord(GenericRecord record, Schema schema) throws IOException {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
+
+ new GenericDatumWriter<>(schema).write(record, encoder);
+ encoder.flush();
+ return stream.toByteArray();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
index 9a9061e..9205627 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/utils/TestDataGenerator.java
@@ -97,7 +97,7 @@ public class TestDataGenerator {
private static String generateRandomString(Random rnd, int maxLen) {
char[] chars = new char[rnd.nextInt(maxLen + 1)];
for (int i = 0; i < chars.length; i++) {
- chars[i] = (char) rnd.nextInt(Character.MAX_VALUE);
+ chars[i] = (char) rnd.nextInt(Character.MIN_SURROGATE);
}
return new String(chars);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/avro/user.avsc b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
index 9685a15..f493d1f 100644
--- a/flink-formats/flink-avro/src/test/resources/avro/user.avsc
+++ b/flink-formats/flink-avro/src/test/resources/avro/user.avsc
@@ -32,4 +32,12 @@
{"name": "type_union", "type": ["null", "boolean", "long", "double"]},
{"name": "type_nested", "type": ["null", "Address"]}
]
+},
+ {"namespace": "org.apache.flink.formats.avro.generated",
+ "type": "record",
+ "name": "SimpleRecord",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "optionalField", "type": ["null", "int"], "default": null}
+ ]
}]
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data
index 028c1e6..42eaf5d 100644
Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serialized-data differ
http://git-wip-us.apache.org/repos/asf/flink/blob/fd101be1/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
----------------------------------------------------------------------
diff --git a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot
index 5bfdf728..0599305 100644
Binary files a/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot and b/flink-formats/flink-avro/src/test/resources/flink-1.3-avro-type-serializer-snapshot differ