You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2018/05/11 20:11:57 UTC

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

GitHub user dawidwys opened a pull request:

    https://github.com/apache/flink/pull/5995

     [FLINK-9337] Implemented AvroDeserializationSchema

    ## What is the purpose of the change
    Provides implementation of AvroDeserializationSchema that reads records serialized as avro and also provides version that uses Confluent Schema Registry to look up writer schema.
    
    
    ## Brief change log
    
      - Implemented AvroDeserializationSchema / RegistryAvroDeserializationSchema / ConfluentRegistryAvroDeserializationSchema
      - Extended AvroSerializer to handle GenericRecords
      - Added GenericRecordTypeInformation
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (**yes** / no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (**yes** / no / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (**yes** / no)
      - If yes, how is the feature documented? (not applicable / docs / **JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dawidwys/flink avro-deserializer2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5995.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5995
    
----
commit 885c31daa5ad9db03924311a6f443b72795d6ca3
Author: Dawid Wysakowicz <dw...@...>
Date:   2018-05-11T16:55:10Z

    [FLINK-9337] Implemented AvroDeserializationSchema

commit 4f1b398837d83d1a8be9a697a686a2ff54c5b22c
Author: Dawid Wysakowicz <dw...@...>
Date:   2018-05-11T16:57:26Z

    [FLINK-9338] Implemented RegistryAvroDeserializationSchema & provided implementation for Confluent Schema Registry

----


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188321914
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.AvroDeserializationSchema;
    +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
    + * Confluent Schema Registry.
    + *
    + * @param <T> type of record it produces
    + */
    +public class ConfluentRegistryAvroDeserializationSchema<T>
    +	extends RegistryAvroDeserializationSchema<T> {
    +
    +	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
    +	 */
    +	private ConfluentRegistryAvroDeserializationSchema(
    +		Class<T> recordClazz,
    +		@Nullable Schema reader,
    +		SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader, schemaCoderProvider);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
    +		Schema schema, String url) {
    +		return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema              schema of produced records
    +	 * @param url                 url of schema registry to connect
    +	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
    +		Schema schema, String url, int identityMapCapacity) {
    +		return new ConfluentRegistryAvroDeserializationSchema<>(
    +			GenericRecord.class,
    +			schema,
    +			new CachedSchemaCoderProvider(url, identityMapCapacity));
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
    +	 * schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
    +		Class<T> tClass, String url) {
    +		return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
    +	 * schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param tClass              class of record to be produced
    +	 * @param url                 url of schema registry to connect
    +	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(
    +		Class<T> tClass, String url, int identityMapCapacity) {
    +		return new ConfluentRegistryAvroDeserializationSchema<>(
    +			tClass,
    +			null,
    +			new CachedSchemaCoderProvider(url, identityMapCapacity)
    +		);
    +	}
    +
    +	private static class CachedSchemaCoderProvider implements SchemaCoder.SchemaCoderProvider {
    +
    +		private final String url;
    +		private final int identityMapCapacity;
    +
    +		private CachedSchemaCoderProvider(String url, int identityMapCapacity) {
    --- End diff --
    
    Private constructor in an inner class requires a bridge constructor or the compiler needs to change the visibility. I think it is good practice to give inner classes package-private constructors.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r198150284
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.AvroDeserializationSchema;
    +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
    + * Confluent Schema Registry.
    + *
    + * @param <T> type of record it produces
    + */
    +public class ConfluentRegistryAvroDeserializationSchema<T> extends RegistryAvroDeserializationSchema<T> {
    +
    +	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    +
    +	private static final long serialVersionUID = -1671641202177852775L;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
    +	 */
    +	private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
    +			SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader, schemaCoderProvider);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url) {
    +		return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema              schema of produced records
    +	 * @param url                 url of schema registry to connect
    +	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url,
    +			int identityMapCapacity) {
    +		return new ConfluentRegistryAvroDeserializationSchema<>(
    +			GenericRecord.class,
    +			schema,
    +			new CachedSchemaCoderProvider(url, identityMapCapacity));
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
    +	 * schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
    +			String url) {
    +		return forSpecific(tClass, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro
    +	 * schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param tClass              class of record to be produced
    +	 * @param url                 url of schema registry to connect
    +	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> ConfluentRegistryAvroDeserializationSchema<T> forSpecific(Class<T> tClass,
    --- End diff --
    
    @dawidwys couldn't this be `private`?


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by medcv <gi...@git.apache.org>.
Github user medcv commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r198150153
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.AvroDeserializationSchema;
    +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
    + * Confluent Schema Registry.
    + *
    + * @param <T> type of record it produces
    + */
    +public class ConfluentRegistryAvroDeserializationSchema<T> extends RegistryAvroDeserializationSchema<T> {
    +
    +	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    +
    +	private static final long serialVersionUID = -1671641202177852775L;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
    +	 */
    +	private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
    +			SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader, schemaCoderProvider);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url) {
    +		return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema              schema of produced records
    +	 * @param url                 url of schema registry to connect
    +	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url,
    --- End diff --
    
    @dawidwys couldn't this be `private`?


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    Added a few more comment, most importantly around exception wrapping.
    Otherwise, looking good...


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188355756
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---
    @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
     	//  Utilities
     	// ------------------------------------------------------------------------
     
    +	private static boolean isGenericRecord(Class<?> type) {
    +		return !SpecificRecord.class.isAssignableFrom(type) &&
    +			GenericRecord.class.isAssignableFrom(type);
    +	}
    +
     	@Override
     	public TypeSerializer<T> duplicate() {
    -		return new AvroSerializer<>(type);
    +		if (schemaString != null) {
    +			return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString));
    --- End diff --
    
    Duplication happens frequently, would be good to avoid schema parsing. You can add a private copy constructor that takes class and string.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188386286
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    +
    +	/**
    +	 * Reader that deserializes byte array into a record.
    +	 */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/**
    +	 * Input stream to read message from.
    +	 */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/**
    +	 * Avro decoder that decodes binary data.
    +	 */
    +	private transient Decoder decoder;
    +
    +	/**
    +	 * Avro schema for the reader.
    +	 */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    +		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
    +		return new AvroDeserializationSchema<>(tClass, null);
    +	}
    +
    +	GenericDatumReader<T> getDatumReader() {
    +		if (datumReader != null) {
    +			return datumReader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new SpecificDatumReader<>();
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new GenericDatumReader<>();
    +		} else {
    +			this.datumReader = new ReflectDatumReader<>();
    +		}
    +
    +		return datumReader;
    +	}
    +
    +	Schema getReaderSchema() {
    +		if (reader != null) {
    +			return reader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.reader = SpecificData.get().getSchema(recordClazz);
    --- End diff --
    
    Didn't think about it, sorry.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188316052
  
    --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
    @@ -0,0 +1,94 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<parent>
    +		<artifactId>flink-formats</artifactId>
    +		<groupId>org.apache.flink</groupId>
    +		<version>1.6-SNAPSHOT</version>
    +	</parent>
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<artifactId>flink-avro-confluent-registry</artifactId>
    --- End diff --
    
    That is a good question - maybe eventually. We could leave it in `flink-formats` for now until we have a case to create `flink-catalogs`.
    
    This is also not a full-blown catalog support, as for the Table API, but something much simpler - just multiple Avro Schemas.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188328926
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    --- End diff --
    
    I would avoid null initialization, it is redundant. It actually does nothing (fields are null anyways) but acually exists as byte code, hence costs cpu cycles.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188328437
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    --- End diff --
    
    Double Apache header


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r187992680
  
    --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
    @@ -0,0 +1,94 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<parent>
    +		<artifactId>flink-formats</artifactId>
    +		<groupId>org.apache.flink</groupId>
    +		<version>1.6-SNAPSHOT</version>
    +	</parent>
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<artifactId>flink-avro-confluent-registry</artifactId>
    --- End diff --
    
    Do we really want the Confluent Schema Registry code to be in `flink-formats`? Shouldn't this be in something like `flink-catalogs` in the future?


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    Looks good, thanks!
    
    +1 to merge this


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188325819
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoder.java ---
    @@ -0,0 +1,59 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
    +import org.apache.avro.Schema;
    +
    +import java.io.DataInputStream;
    +import java.io.InputStream;
    +
    +/**
    + * Reads schema using Confluent Schema Registry protocol.
    + */
    +public class ConfluentSchemaRegistryCoder implements SchemaCoder {
    +
    +	private final SchemaRegistryClient schemaRegistryClient;
    +
    +	/**
    +	 * Creates {@link SchemaCoder} that uses provided {@link SchemaRegistryClient} to connect to
    +	 * schema registry.
    +	 *
    +	 * @param schemaRegistryClient client to connect schema registry
    +	 */
    +	public ConfluentSchemaRegistryCoder(SchemaRegistryClient schemaRegistryClient) {
    +		this.schemaRegistryClient = schemaRegistryClient;
    +	}
    +
    +	@Override
    +	public Schema readSchema(InputStream in) throws Exception {
    +		DataInputStream dataInputStream = new DataInputStream(in);
    +
    +		if (dataInputStream.readByte() != 0) {
    +			throw new RuntimeException("Unknown data format. Magic number does not match");
    --- End diff --
    
    RuntimeExceptions (unchecked exceptions) are usually used to indicate programming errors, or (as a workaround) if the scope does not allow throwing any exception.
    
    This here is a case for a checked exception, in my opinion, like an `IOException`, `FlinkException`, etc.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188353074
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---
    @@ -79,15 +87,16 @@
     
     	// -------- runtime fields, non-serializable, lazily initialized -----------
     
    -	private transient SpecificDatumWriter<T> writer;
    -	private transient SpecificDatumReader<T> reader;
    +	private transient GenericDatumWriter<T> writer;
    +	private transient GenericDatumReader<T> reader;
     
     	private transient DataOutputEncoder encoder;
     	private transient DataInputDecoder decoder;
     
    -	private transient SpecificData avroData;
    +	private transient GenericData avroData;
     
     	private transient Schema schema;
    +	private final String schemaString;
    --- End diff --
    
    As per the comments, the existing code orders config fields before runtime fields. Can you place the schema to match that pattern?


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    As for the snapshot binary data, I do understand that it should be created with appropriate flink version (in this case in theory with flink 1.3) and I've tried really hard to do so until I found out that this test is incompatible with 1.3 and the data could not be generated with flink 1.3 Later found out the comment to the test class that also states so: 
    
    > <p><b>Important:</b> Since Avro itself broke class compatibility between 1.7.7 (used in Flink 1.3) 
    
    > * and 1.8.2 (used in Flink 1.4), the Avro by Pojo compatibility is broken through Avro already.
    > * This test only tests that the Avro serializer change (switching from Pojo to Avro for Avro types)
    > * works properly.
    
    Also the commented code does not compile with flink 1.3(but this is a minor thing)
    
    Data serialized with version of avro used in flink 1.3 (1.7.7) is not binary compatible with avro 1.8.2 (in flink 1.4+), due to changes how SpecificFixed is constructed.
    
    Therefore how I regenerated this snapshot data is that I run the commented code on current branch. That is why I also changed few descriptions to that test as it test compatibility of `PojoSerializer` with `AvroSerializer` rather than binary backwards compatibility. 
    
    Nevertheless I am more than happy to hear any comments on that. 


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r189197766
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	private static final long serialVersionUID = -6766681879020862312L;
    +
    +	/** Class to deserialize to. */
    +	private final Class<T> recordClazz;
    +
    +	/** Schema in case of GenericRecord for serialization purpose. */
    +	private final String schemaString;
    +
    +	/** Reader that deserializes byte array into a record. */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/** Input stream to read message from. */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/** Avro decoder that decodes binary data. */
    +	private transient Decoder decoder;
    +
    +	/** Avro schema for the reader. */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		} else {
    +			this.schemaString = null;
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    +		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
    +		return new AvroDeserializationSchema<>(tClass, null);
    +	}
    +
    +	GenericDatumReader<T> getDatumReader() {
    +		return datumReader;
    +	}
    +
    +	Schema getReaderSchema() {
    +		return reader;
    +	}
    +
    +	MutableByteArrayInputStream getInputStream() {
    +		return inputStream;
    +	}
    +
    +	Decoder getDecoder() {
    +		return decoder;
    +	}
    +
    +	@Override
    +	public T deserialize(byte[] message) {
    +		// read record
    +		try {
    +			checkAvroInitialized();
    +			inputStream.setBuffer(message);
    +			Schema readerSchema = getReaderSchema();
    +			GenericDatumReader<T> datumReader = getDatumReader();
    +
    +			datumReader.setSchema(readerSchema);
    +
    +			return datumReader.read(null, decoder);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to deserialize message.", e);
    --- End diff --
    
    Unnecessary exception wrapping, see other comment.


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    @StephanEwen could you have another look?


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188349118
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    +
    +	/**
    +	 * Reader that deserializes byte array into a record.
    +	 */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/**
    +	 * Input stream to read message from.
    +	 */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/**
    +	 * Avro decoder that decodes binary data.
    +	 */
    +	private transient Decoder decoder;
    +
    +	/**
    +	 * Avro schema for the reader.
    +	 */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    +		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
    +		return new AvroDeserializationSchema<>(tClass, null);
    +	}
    +
    +	GenericDatumReader<T> getDatumReader() {
    +		if (datumReader != null) {
    +			return datumReader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new SpecificDatumReader<>();
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new GenericDatumReader<>();
    +		} else {
    +			this.datumReader = new ReflectDatumReader<>();
    +		}
    +
    +		return datumReader;
    +	}
    +
    +	Schema getReaderSchema() {
    +		if (reader != null) {
    +			return reader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.reader = SpecificData.get().getSchema(recordClazz);
    --- End diff --
    
    Do we need to pass the use code class loader into the SpecificData / GenericData / etc? In order to resolve nested user-defined types?


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188349853
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}.
    + *
    + * @param <T> type of record it produces
    + */
    +public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
    +	private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
    +	private transient SchemaCoder schemaCoder;
    +
    +	/**
    +	 * Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for
    +	 *                            schema reading
    +	 */
    +	protected RegistryAvroDeserializationSchema(
    +		Class<T> recordClazz,
    +		@Nullable Schema reader,
    +		SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader);
    --- End diff --
    
    Empty line / indentation


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r189195014
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}.
    + *
    + * @param <T> type of record it produces
    + */
    +public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
    +
    +	private static final long serialVersionUID = -884738268437806062L;
    +
    +	/** Provider for schema coder. Used for initializing in each task. */
    +	private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
    +
    +	/** Coder used for reading schema from incoming stream. */
    +	private transient SchemaCoder schemaCoder;
    +
    +	/**
    +	 * Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for
    +	 *                            schema reading
    +	 */
    +	protected RegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
    +			SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader);
    +		this.schemaCoderProvider = schemaCoderProvider;
    +		this.schemaCoder = schemaCoderProvider.get();
    +	}
    +
    +	@Override
    +	public T deserialize(byte[] message) {
    +		// read record
    +		try {
    +			checkAvroInitialized();
    +			getInputStream().setBuffer(message);
    +			Schema writerSchema = schemaCoder.readSchema(getInputStream());
    +			Schema readerSchema = getReaderSchema();
    +
    +			GenericDatumReader<T> datumReader = getDatumReader();
    +
    +			datumReader.setSchema(writerSchema);
    +			datumReader.setExpected(readerSchema);
    +
    +			return datumReader.read(null, getDecoder());
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to deserialize Row.", e);
    --- End diff --
    
    The method `deserialize()` can throw an IOException. That got dropped from the signature, and exceptions are not wrapped into a RuntimeException. That makes exception stack traces more complicated, and hides the fact that "there is a possible exceptional case to handle" from the consumers of that code.
    
    I think that this makes a general rule: Whenever using `RutimeException`, take a step back and look at the exception structure and signatures, and see if something is not declared well.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r189197633
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	private static final long serialVersionUID = -6766681879020862312L;
    +
    +	/** Class to deserialize to. */
    +	private final Class<T> recordClazz;
    +
    +	/** Schema in case of GenericRecord for serialization purpose. */
    +	private final String schemaString;
    +
    +	/** Reader that deserializes byte array into a record. */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/** Input stream to read message from. */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/** Avro decoder that decodes binary data. */
    +	private transient Decoder decoder;
    +
    +	/** Avro schema for the reader. */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		} else {
    +			this.schemaString = null;
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    --- End diff --
    
    Minor comment: I found it helps code structure/readability to move static/factory methods either to the top or the bottom of the class.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188316643
  
    --- Diff: flink-formats/flink-avro-confluent-registry/pom.xml ---
    @@ -0,0 +1,94 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +Licensed to the Apache Software Foundation (ASF) under one
    +or more contributor license agreements.  See the NOTICE file
    +distributed with this work for additional information
    +regarding copyright ownership.  The ASF licenses this file
    +to you under the Apache License, Version 2.0 (the
    +"License"); you may not use this file except in compliance
    +with the License.  You may obtain a copy of the License at
    +
    +  http://www.apache.org/licenses/LICENSE-2.0
    +
    +Unless required by applicable law or agreed to in writing,
    +software distributed under the License is distributed on an
    +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    +KIND, either express or implied.  See the License for the
    +specific language governing permissions and limitations
    +under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"
    +		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    +		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +	<parent>
    +		<artifactId>flink-formats</artifactId>
    +		<groupId>org.apache.flink</groupId>
    +		<version>1.6-SNAPSHOT</version>
    +	</parent>
    +	<modelVersion>4.0.0</modelVersion>
    +
    +	<artifactId>flink-avro-confluent-registry</artifactId>
    +
    +	<repositories>
    +		<repository>
    +			<id>confluent</id>
    +			<url>http://packages.confluent.io/maven/</url>
    +		</repository>
    +	</repositories>
    +
    +	<dependencies>
    +		<dependency>
    +			<groupId>io.confluent</groupId>
    +			<artifactId>kafka-schema-registry-client</artifactId>
    +			<version>3.3.1</version>
    +			<exclusions>
    +				<exclusion>
    +					<groupId>org.apache.avro</groupId>
    +					<artifactId>avro</artifactId>
    +				</exclusion>
    +				<exclusion>
    +					<groupId>org.slf4j</groupId>
    +					<artifactId>slf4j-log4j12</artifactId>
    +				</exclusion>
    +			</exclusions>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-core</artifactId>
    +			<version>${project.version}</version>
    +			<scope>provided</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-avro</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +	</dependencies>
    +
    +	<build>
    +		<plugins>
    +			<plugin>
    +				<groupId>org.apache.maven.plugins</groupId>
    +				<artifactId>maven-shade-plugin</artifactId>
    +				<executions>
    +					<execution>
    +						<id>shade-flink</id>
    +						<phase>package</phase>
    +						<goals>
    +							<goal>shade</goal>
    +						</goals>
    +						<configuration>
    +							<relocations combine.children="append">
    +								<relocation>
    +									<pattern>com.fasterxml.jackson.core</pattern>
    +									<shadedPattern>org.apache.flink.shaded.com.fasterxml.jackson.core</shadedPattern>
    --- End diff --
    
    We may need to qualify this further by this project, because we have that relocation pattern already in other places, for potentially different jackson versions.


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by cricket007 <gi...@git.apache.org>.
Github user cricket007 commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    What about implementing a `KeyedDeserializationSchema` for Avro?


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188319368
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.AvroDeserializationSchema;
    +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
    + * Confluent Schema Registry.
    + *
    + * @param <T> type of record it produces
    + */
    +public class ConfluentRegistryAvroDeserializationSchema<T>
    +	extends RegistryAvroDeserializationSchema<T> {
    +
    +	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
    +	 */
    +	private ConfluentRegistryAvroDeserializationSchema(
    +		Class<T> recordClazz,
    +		@Nullable Schema reader,
    +		SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader, schemaCoderProvider);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(
    +		Schema schema, String url) {
    +		return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    --- End diff --
    
    Same as above


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5995


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188355390
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---
    @@ -99,9 +108,29 @@
     
     	/**
     	 * Creates a new AvroSerializer for the type indicated by the given class.
    +	 * This constructor is intended to be used with {@link SpecificRecord} or reflection serializer.
    +	 * For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)}
     	 */
     	public AvroSerializer(Class<T> type) {
    +		Preconditions.checkArgument(!isGenericRecord(type),
    --- End diff --
    
    Minor: Other preconditions checks in this class are done by statically imported methods. While this is not consistent within the code base, I would suggest to keep this consistent within a class as much as possible.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188349785
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}.
    + *
    + * @param <T> type of record it produces
    + */
    +public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
    +	private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
    --- End diff --
    
    We typically do empty lines between class declaration and members.


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    Thanks, the main code looks good!
    
    Unfortunately, this seems to wither break the compatibility with prior savepoints (when Avro types were implicitly handled through Kryo, now bridged through the `BackwarsCompatibleAvroSerializer`) or needs to adjust that test.
    
    There are also some license header issues, causing the build to fail.


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    I've addressed your comments @StephanEwen . If you don't have any more, I will merge it today.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188319278
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.AvroDeserializationSchema;
    +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
    + * Confluent Schema Registry.
    + *
    + * @param <T> type of record it produces
    + */
    +public class ConfluentRegistryAvroDeserializationSchema<T>
    +	extends RegistryAvroDeserializationSchema<T> {
    +
    +	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
    +	 */
    +	private ConfluentRegistryAvroDeserializationSchema(
    +		Class<T> recordClazz,
    +		@Nullable Schema reader,
    +		SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader, schemaCoderProvider);
    --- End diff --
    
    For such situations, code in the method and parameter list should use different indentation, or be separated by an empty line. Otherwise makes it hard to parse.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188348636
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    --- End diff --
    
    That would mean initializing it all lazily, in `getDatumReader()` or in a `checkAvroInitialized()` method.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r198156227
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import org.apache.flink.formats.avro.AvroDeserializationSchema;
    +import org.apache.flink.formats.avro.RegistryAvroDeserializationSchema;
    +import org.apache.flink.formats.avro.SchemaCoder;
    +
    +import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder} that uses
    + * Confluent Schema Registry.
    + *
    + * @param <T> type of record it produces
    + */
    +public class ConfluentRegistryAvroDeserializationSchema<T> extends RegistryAvroDeserializationSchema<T> {
    +
    +	private static final int DEFAULT_IDENTITY_MAP_CAPACITY = 1000;
    +
    +	private static final long serialVersionUID = -1671641202177852775L;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider provider for schema coder that reads writer schema from Confluent Schema Registry
    +	 */
    +	private ConfluentRegistryAvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader,
    +			SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader, schemaCoderProvider);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @param url    url of schema registry to connect
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url) {
    +		return forGeneric(schema, url, DEFAULT_IDENTITY_MAP_CAPACITY);
    +	}
    +
    +	/**
    +	 * Creates {@link ConfluentRegistryAvroDeserializationSchema} that produces {@link GenericRecord}
    +	 * using provided reader schema and looks up writer schema in Confluent Schema Registry.
    +	 *
    +	 * @param schema              schema of produced records
    +	 * @param url                 url of schema registry to connect
    +	 * @param identityMapCapacity maximum number of cached schema versions (default: 1000)
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static ConfluentRegistryAvroDeserializationSchema<GenericRecord> forGeneric(Schema schema, String url,
    --- End diff --
    
    End user is supposed to use only this or `forSpecific` method and no other one. Therefore it must be public.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188347289
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    --- End diff --
    
    This class uses a mixture of eager initialization of transient members (in readObject()) and lazy initialization (in getDatumReader()).
    
    I would suggest to do it all one way or the other.
    
    My suggestion would be to avoid `readObject()` whenever possible. If you encounter an exception during schema parsing (and it may be something weird from Jackson, like a missing manifest due to a shading issue), you will get the most unhelpful exception stack trace ever, in the weirdest place (like Flink's RPC message decoder).
    
    In my experience, when a user sees such a stack trace, they are rarely able to diagnose that. Best case they show up on the mailing list, worst case they give up.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r189185186
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    +
    +	/**
    +	 * Reader that deserializes byte array into a record.
    +	 */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/**
    +	 * Input stream to read message from.
    +	 */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/**
    +	 * Avro decoder that decodes binary data.
    +	 */
    +	private transient Decoder decoder;
    +
    +	/**
    +	 * Avro schema for the reader.
    +	 */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    +		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
    +		return new AvroDeserializationSchema<>(tClass, null);
    +	}
    +
    +	GenericDatumReader<T> getDatumReader() {
    +		if (datumReader != null) {
    +			return datumReader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new SpecificDatumReader<>();
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new GenericDatumReader<>();
    +		} else {
    +			this.datumReader = new ReflectDatumReader<>();
    +		}
    +
    +		return datumReader;
    +	}
    +
    +	Schema getReaderSchema() {
    +		if (reader != null) {
    +			return reader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.reader = SpecificData.get().getSchema(recordClazz);
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			throw new IllegalStateException(
    +				"Cannot infer schema for generic record. Please pass explicit schema in the ctor.");
    +		} else {
    +			this.reader = ReflectData.get().getSchema(recordClazz);
    +		}
    +
    +		return reader;
    +	}
    +
    +	MutableByteArrayInputStream getInputStream() {
    +		return inputStream;
    +	}
    +
    +	Decoder getDecoder() {
    +		return decoder;
    +	}
    +
    +	@Override
    +	public T deserialize(byte[] message) {
    +		// read record
    +		try {
    +			inputStream.setBuffer(message);
    +			Schema readerSchema = getReaderSchema();
    +			GenericDatumReader<T> datumReader = getDatumReader();
    +
    +			datumReader.setSchema(readerSchema);
    +
    +			return datumReader.read(null, decoder);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to deserialize message.", e);
    +		}
    +	}
    +
    +	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
    +		ois.defaultReadObject();
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		if (schemaString != null) {
    +			this.reader = new Schema.Parser().parse(schemaString);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isEndOfStream(T nextElement) {
    +		return false;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public TypeInformation<T> getProducedType() {
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			return new AvroTypeInfo(recordClazz, false);
    --- End diff --
    
    Oh, I see. I guess you can push the type info creation into a static method with a different generic scope, but that would not be simpler to read. Would keep it like it is then.



---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188338897
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    --- End diff --
    
    You can make this variable final as well.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188386920
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSerializer.java ---
    @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
     	//  Utilities
     	// ------------------------------------------------------------------------
     
    +	private static boolean isGenericRecord(Class<?> type) {
    +		return !SpecificRecord.class.isAssignableFrom(type) &&
    +			GenericRecord.class.isAssignableFrom(type);
    +	}
    +
     	@Override
     	public TypeSerializer<T> duplicate() {
    -		return new AvroSerializer<>(type);
    +		if (schemaString != null) {
    +			return new AvroSerializer<>(type, new Schema.Parser().parse(schemaString));
    --- End diff --
    
    Didn't think it through well. Thought we need to create a deep copy of the schema, but as it is stateless I think we can just pass the schema. My mistake. Correct me if I am wrong. 


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188350196
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroDeserializationSchema.java ---
    @@ -0,0 +1,82 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format using {@link SchemaCoder}.
    + *
    + * @param <T> type of record it produces
    + */
    +public class RegistryAvroDeserializationSchema<T> extends AvroDeserializationSchema<T> {
    +	private final SchemaCoder.SchemaCoderProvider schemaCoderProvider;
    +	private transient SchemaCoder schemaCoder;
    +
    +	/**
    +	 * Creates Avro deserialization schema that reads schema from input stream using provided {@link SchemaCoder}.
    +	 *
    +	 * @param recordClazz         class to which deserialize. Should be either
    +	 *                            {@link SpecificRecord} or {@link GenericRecord}.
    +	 * @param reader              reader's Avro schema. Should be provided if recordClazz is
    +	 *                            {@link GenericRecord}
    +	 * @param schemaCoderProvider schema provider that allows instantiation of {@link SchemaCoder} that will be used for
    +	 *                            schema reading
    +	 */
    +	protected RegistryAvroDeserializationSchema(
    +		Class<T> recordClazz,
    +		@Nullable Schema reader,
    +		SchemaCoder.SchemaCoderProvider schemaCoderProvider) {
    +		super(recordClazz, reader);
    +		this.schemaCoderProvider = schemaCoderProvider;
    +		this.schemaCoder = schemaCoderProvider.get();
    +	}
    +
    +	@Override
    +	public T deserialize(byte[] message) {
    +		// read record
    +		try {
    +			getInputStream().setBuffer(message);
    +			Schema writerSchema = schemaCoder.readSchema(getInputStream());
    +			Schema readerSchema = getReaderSchema();
    +
    +			GenericDatumReader<T> datumReader = getDatumReader();
    +
    +			datumReader.setSchema(writerSchema);
    +			datumReader.setExpected(readerSchema);
    +
    +			return datumReader.read(null, getDecoder());
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to deserialize Row.", e);
    +		}
    +	}
    +
    +	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
    --- End diff --
    
    See above, would suggest to avoid readObject()


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    I would actually keep the package name for now. It makes sense, because the connection to the registry is avro-specific at the moment...


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188337378
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    --- End diff --
    
    This should have a serial version UID.
    You can activate the respective inspections in IntelliJ to warn about such issues.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188328236
  
    --- Diff: flink-formats/flink-avro-confluent-registry/src/test/java/org/apache/flink/formats/avro/registry/confluent/ConfluentSchemaRegistryCoderTest.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro.registry.confluent;
    +
    +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
    +import org.apache.avro.Schema;
    +import org.apache.avro.SchemaBuilder;
    +import org.junit.Test;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.DataOutputStream;
    +
    +import static org.junit.Assert.assertEquals;
    +
    +/**
    + * Tests for {@link ConfluentSchemaRegistryCoder}.
    + */
    +public class ConfluentSchemaRegistryCoderTest {
    --- End diff --
    
    Do we want to test the magic byte verification?


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188329273
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    --- End diff --
    
    Minor comment: Many newer classes pick up a style where the JavaDocs of fields are in one line, to make the fields section a bit more compact:
    ```
    /** Class to deserialize to. */
    private Class<T> recordClazz;
    ```


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188336725
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    --- End diff --
    
    All fields should be final whenever possible - immutability as the default choice.
    That acts both as documentation about the writer's intention and makes it future proof.


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    Right sorry for that. I've changed the data generator a bit, so it produced different results than before with the same seed. I've recreated the serialized data with updated `TestDataGenerator`. It took me a while though to figure out that it should be created with current code rather than 1.3 branch. Therefore I updated the comment accordingly.
    
    Also reworded a bit other names as the `BackwardsCompatibleAvroSerializerTest` does not test compatibility with 1.3, but only with `PojoSerializer`.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188340240
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    +
    +	/**
    +	 * Reader that deserializes byte array into a record.
    +	 */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/**
    +	 * Input stream to read message from.
    +	 */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/**
    +	 * Avro decoder that decodes binary data.
    +	 */
    +	private transient Decoder decoder;
    +
    +	/**
    +	 * Avro schema for the reader.
    +	 */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    +		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
    +		return new AvroDeserializationSchema<>(tClass, null);
    +	}
    +
    +	GenericDatumReader<T> getDatumReader() {
    +		if (datumReader != null) {
    +			return datumReader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new SpecificDatumReader<>();
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new GenericDatumReader<>();
    +		} else {
    +			this.datumReader = new ReflectDatumReader<>();
    +		}
    +
    +		return datumReader;
    +	}
    +
    +	Schema getReaderSchema() {
    +		if (reader != null) {
    +			return reader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.reader = SpecificData.get().getSchema(recordClazz);
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			throw new IllegalStateException(
    +				"Cannot infer schema for generic record. Please pass explicit schema in the ctor.");
    +		} else {
    +			this.reader = ReflectData.get().getSchema(recordClazz);
    +		}
    +
    +		return reader;
    +	}
    +
    +	MutableByteArrayInputStream getInputStream() {
    +		return inputStream;
    +	}
    +
    +	Decoder getDecoder() {
    +		return decoder;
    +	}
    +
    +	@Override
    +	public T deserialize(byte[] message) {
    +		// read record
    +		try {
    +			inputStream.setBuffer(message);
    +			Schema readerSchema = getReaderSchema();
    +			GenericDatumReader<T> datumReader = getDatumReader();
    +
    +			datumReader.setSchema(readerSchema);
    +
    +			return datumReader.read(null, decoder);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to deserialize message.", e);
    +		}
    +	}
    +
    +	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
    +		ois.defaultReadObject();
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		if (schemaString != null) {
    +			this.reader = new Schema.Parser().parse(schemaString);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isEndOfStream(T nextElement) {
    +		return false;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public TypeInformation<T> getProducedType() {
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			return new AvroTypeInfo(recordClazz, false);
    --- End diff --
    
    Avoid use of raw types. Generic variant should be possible here:
    `return new AvroTypeInfo<>(recordClazz,.asSubclass(SpecificRecord.class), false);`


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r189185420
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,176 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericData;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	private static final long serialVersionUID = -6766681879020862312L;
    +
    +	/** Class to deserialize to. */
    +	private final Class<T> recordClazz;
    +
    +	/** Schema in case of GenericRecord for serialization purpose. */
    +	private final String schemaString;
    +
    +	/** Reader that deserializes byte array into a record. */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/** Input stream to read message from. */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/** Avro decoder that decodes binary data. */
    +	private transient Decoder decoder;
    +
    +	/** Avro schema for the reader. */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    --- End diff --
    
    I would skip the initialization in the constructor, if you have he initialization in `checkAvroInitialized()`. Simpler, and avoids having two places that to the initialization which have to be kept in sync.


---

[GitHub] flink pull request #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5995#discussion_r188385527
  
    --- Diff: flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroDeserializationSchema.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.formats.avro;
    +
    +import org.apache.flink.api.common.serialization.DeserializationSchema;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
    +import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
    +import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
    +import org.apache.flink.util.Preconditions;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.avro.generic.GenericRecord;
    +import org.apache.avro.io.Decoder;
    +import org.apache.avro.io.DecoderFactory;
    +import org.apache.avro.reflect.ReflectData;
    +import org.apache.avro.reflect.ReflectDatumReader;
    +import org.apache.avro.specific.SpecificData;
    +import org.apache.avro.specific.SpecificDatumReader;
    +import org.apache.avro.specific.SpecificRecord;
    +
    +import javax.annotation.Nullable;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +
    +/**
    + * Deserialization schema that deserializes from Avro binary format.
    + *
    + * @param <T> type of record it produces
    + */
    +public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {
    +
    +	/**
    +	 * Class to deserialize to.
    +	 */
    +	private Class<T> recordClazz;
    +
    +	private String schemaString = null;
    +
    +	/**
    +	 * Reader that deserializes byte array into a record.
    +	 */
    +	private transient GenericDatumReader<T> datumReader;
    +
    +	/**
    +	 * Input stream to read message from.
    +	 */
    +	private transient MutableByteArrayInputStream inputStream;
    +
    +	/**
    +	 * Avro decoder that decodes binary data.
    +	 */
    +	private transient Decoder decoder;
    +
    +	/**
    +	 * Avro schema for the reader.
    +	 */
    +	private transient Schema reader;
    +
    +	/**
    +	 * Creates a Avro deserialization schema.
    +	 *
    +	 * @param recordClazz class to which deserialize. Should be one of:
    +	 *                    {@link org.apache.avro.specific.SpecificRecord},
    +	 *                    {@link org.apache.avro.generic.GenericRecord}.
    +	 * @param reader      reader's Avro schema. Should be provided if recordClazz is
    +	 *                    {@link GenericRecord}
    +	 */
    +	AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
    +		Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
    +		this.recordClazz = recordClazz;
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		this.reader = reader;
    +		if (reader != null) {
    +			this.schemaString = reader.toString();
    +		}
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
    +	 *
    +	 * @param schema schema of produced records
    +	 * @return deserialized record in form of {@link GenericRecord}
    +	 */
    +	public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
    +		return new AvroDeserializationSchema<>(GenericRecord.class, schema);
    +	}
    +
    +	/**
    +	 * Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
    +	 *
    +	 * @param tClass class of record to be produced
    +	 * @return deserialized record
    +	 */
    +	public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
    +		return new AvroDeserializationSchema<>(tClass, null);
    +	}
    +
    +	GenericDatumReader<T> getDatumReader() {
    +		if (datumReader != null) {
    +			return datumReader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new SpecificDatumReader<>();
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			this.datumReader = new GenericDatumReader<>();
    +		} else {
    +			this.datumReader = new ReflectDatumReader<>();
    +		}
    +
    +		return datumReader;
    +	}
    +
    +	Schema getReaderSchema() {
    +		if (reader != null) {
    +			return reader;
    +		}
    +
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			this.reader = SpecificData.get().getSchema(recordClazz);
    +		} else if (GenericRecord.class.isAssignableFrom(recordClazz)) {
    +			throw new IllegalStateException(
    +				"Cannot infer schema for generic record. Please pass explicit schema in the ctor.");
    +		} else {
    +			this.reader = ReflectData.get().getSchema(recordClazz);
    +		}
    +
    +		return reader;
    +	}
    +
    +	MutableByteArrayInputStream getInputStream() {
    +		return inputStream;
    +	}
    +
    +	Decoder getDecoder() {
    +		return decoder;
    +	}
    +
    +	@Override
    +	public T deserialize(byte[] message) {
    +		// read record
    +		try {
    +			inputStream.setBuffer(message);
    +			Schema readerSchema = getReaderSchema();
    +			GenericDatumReader<T> datumReader = getDatumReader();
    +
    +			datumReader.setSchema(readerSchema);
    +
    +			return datumReader.read(null, decoder);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Failed to deserialize message.", e);
    +		}
    +	}
    +
    +	private void readObject(ObjectInputStream ois) throws ClassNotFoundException, IOException {
    +		ois.defaultReadObject();
    +		this.inputStream = new MutableByteArrayInputStream();
    +		this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    +		if (schemaString != null) {
    +			this.reader = new Schema.Parser().parse(schemaString);
    +		}
    +	}
    +
    +	@Override
    +	public boolean isEndOfStream(T nextElement) {
    +		return false;
    +	}
    +
    +	@Override
    +	@SuppressWarnings("unchecked")
    +	public TypeInformation<T> getProducedType() {
    +		if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
    +			return new AvroTypeInfo(recordClazz, false);
    --- End diff --
    
    Agree raw types should be avoided. Unfortunately I cannot find a way to avoid it here. Your solution does not work cause the `recordClazz.asSubclass(..)` returns `<? extends SpecificRecord>` which does not conforms to `T` of the returned type.


---

[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/5995
  
    Also as for the package name or place where to put it, I don't feel competent to suggest a place, therefore will be happy to apply your suggestion.


---