You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/12/18 15:15:29 UTC

[GitHub] Zhen-hao closed pull request #7301: [FLINK-11160][Kafka Connector]

Zhen-hao closed pull request #7301: [FLINK-11160][Kafka Connector]
URL: https://github.com/apache/flink/pull/7301
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-formats/flink-avro-confluent-registry/pom.xml b/flink-formats/flink-avro-confluent-registry/pom.xml
index 4c99f71e544..724a22987da 100644
--- a/flink-formats/flink-avro-confluent-registry/pom.xml
+++ b/flink-formats/flink-avro-confluent-registry/pom.xml
@@ -40,7 +40,22 @@ under the License.
 		<dependency>
 			<groupId>io.confluent</groupId>
 			<artifactId>kafka-schema-registry-client</artifactId>
-			<version>3.3.1</version>
+			<version>4.1.2</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.avro</groupId>
+					<artifactId>avro</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>org.slf4j</groupId>
+					<artifactId>slf4j-log4j12</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>io.confluent</groupId>
+			<artifactId>kafka-avro-serializer</artifactId>
+			<version>4.1.2</version>
 			<exclusions>
 				<exclusion>
 					<groupId>org.apache.avro</groupId>
@@ -63,6 +78,17 @@ under the License.
 			<artifactId>flink-avro</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.scala-lang</groupId>
+			<artifactId>scala-library</artifactId>
+			<version>${scala.version}</version>
+			<scope>provided</scope>
+		</dependency>
 	</dependencies>
 
 	<build>
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentAvroKeyedSerializationSchemaForGenericRecordScalaPair.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentAvroKeyedSerializationSchemaForGenericRecordScalaPair.java
new file mode 100644
index 00000000000..dbcc426f6f0
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentAvroKeyedSerializationSchemaForGenericRecordScalaPair.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import scala.Tuple2;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serialization schema that serializes instances of key-value Scala pairs of {@link GenericRecord} to Avro binary format using {@link KafkaAvroSerializer} that uses
+ * Confluent Schema Registry.
+ */
+public class ConfluentAvroKeyedSerializationSchemaForGenericRecordScalaPair
+        implements KeyedSerializationSchema<Tuple2<GenericRecord, GenericRecord>> {
+
+
+    private final String topic;
+    private final File valueSchemaFile;
+    private final File keySchemaFile;
+    private final String registryURL;
+    private transient Schema valueSchema;
+    private transient Schema keySchema;
+
+    private transient KafkaAvroSerializer valueSerializer;
+    private transient KafkaAvroSerializer KeySerializer;
+
+    /**
+     * Creates a keyed Avro serialization schema.
+     * The constructor takes files instead of {@link Schema} because {@link Schema} is not serializable.
+     *
+     * @param topic             Kafka topic to write to
+     * @param registryURL       url of schema registry to connect
+     * @param keySchemaFile     file of the Avro writer schema used by the key
+     * @param valueSchemaFile   file of the Avro writer schema used by the value
+     */
+    public ConfluentAvroKeyedSerializationSchemaForGenericRecordScalaPair(String topic, String registryURL, File keySchemaFile, File valueSchemaFile) {
+        this.topic = topic;
+        this.registryURL =registryURL;
+        this.valueSchemaFile = valueSchemaFile;
+        this.keySchemaFile = keySchemaFile;
+    }
+
+    /**
+     * Optional method to determine the target topic for the element.
+     *
+     * @param key_value input Scala pair of key and value
+     * @return null or the target topic
+     */
+    @Override
+    public String getTargetTopic(Tuple2<GenericRecord, GenericRecord> key_value) {
+        return topic;
+    }
+
+    /**
+     * Serializes the key of the input Scala pair of key and value.
+     *
+     * @param key_value input Scala pair of key and value
+     * @return          byte array of the serialized key
+     */
+    @Override
+    public byte[] serializeKey(Tuple2<GenericRecord, GenericRecord> key_value) {
+        if (this.KeySerializer == null) {
+            Schema.Parser parser = new Schema.Parser();
+            try {
+                this.keySchema = parser.parse(keySchemaFile);
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Cannot parse key Avro writer schema file: " + keySchemaFile, e);
+            }
+            CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(registryURL, 100);
+            this.KeySerializer = new KafkaAvroSerializer(schemaRegistry);
+            Map map = new HashMap();
+            map.put("schema.registry.url", registryURL);
+            this.KeySerializer.configure(map, true);
+        }
+        List<Schema.Field> filds = keySchema.getFields();
+        GenericRecord reconstructedRecord = new GenericData.Record(keySchema);
+        for(int i=0; i<filds.size(); i++) {
+            String key = filds.get(i).name();
+            reconstructedRecord.put(key, key_value._1.get(key));
+        }
+        return KeySerializer.serialize(topic, reconstructedRecord);
+    }
+
+    /**
+     * Serializes the value of the input Scala pair of key and value.
+     *
+     * @param key_value input Scala pair of key and value
+     * @return          byte array of the serialized value
+     */
+    @Override
+    public byte[] serializeValue(Tuple2<GenericRecord, GenericRecord> key_value
+    ) {
+        if (this.valueSerializer == null) {
+            Schema.Parser parser = new Schema.Parser();
+            try {
+                this.valueSchema = parser.parse(valueSchemaFile);
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Cannot parse value Avro writer schema file: " + valueSchemaFile, e);
+            }
+            CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(registryURL, 500);
+            this.valueSerializer = new KafkaAvroSerializer(schemaRegistry);
+        }
+        List<Schema.Field> filds = valueSchema.getFields();
+        GenericRecord reconstructedRecord = new GenericData.Record(valueSchema);
+        for(int i=0; i<filds.size(); i++) {
+            String key = filds.get(i).name();
+            reconstructedRecord.put(key, key_value._2.get(key));
+        }
+        return valueSerializer.serialize(topic, reconstructedRecord);
+    }
+
+}
diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentAvroSerializationSchemaForGenericRecord.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentAvroSerializationSchemaForGenericRecord.java
new file mode 100644
index 00000000000..0ed02294a38
--- /dev/null
+++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentAvroSerializationSchemaForGenericRecord.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.avro.registry.confluent;
+
+
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serialization schema that serializes instances of {@link GenericRecord} to Avro binary format using {@link KafkaAvroSerializer} that uses
+ * Confluent Schema Registry.
+ */
+public class ConfluentAvroSerializationSchemaForGenericRecord
+        implements SerializationSchema<GenericRecord> {
+
+    private final String topic;
+    private final File schemaFile;
+    private final String registryURL;
+    private transient Schema externalSchema;
+    private transient KafkaAvroSerializer encoder;
+
+    /**
+     * Creates a Avro serialization schema.
+     * The constructor takes files instead of {@link Schema} because {@link Schema} is not serializable.
+     *
+     * @param topic             Kafka topic to write to
+     * @param registryURL       url of schema registry to connect
+     * @param schemaFile     file of the Avro writer schema
+     */
+    public ConfluentAvroSerializationSchemaForGenericRecord(String topic, String registryURL, File schemaFile) {
+        this.topic = topic;
+        this.registryURL =registryURL;
+        this.schemaFile = schemaFile;
+    }
+
+    /**
+     * Serializes the input record.
+     *
+     * @param element   input record
+     * @return          byte array of the serialized value
+     */
+    @Override
+    public byte[] serialize(GenericRecord element) {
+        if (this.encoder == null) {
+            Schema.Parser parser = new Schema.Parser();
+            try {
+                this.externalSchema = parser.parse(schemaFile);
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Cannot parse external Avro reader schema file: " + schemaFile, e);
+            }
+            CachedSchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(registryURL, 100);
+            this.encoder = new KafkaAvroSerializer(schemaRegistry);
+        }
+        List<Schema.Field> filds = externalSchema.getFields();
+        GenericRecord reconstructedRecord = new GenericData.Record(externalSchema);
+        for(int i=0; i<filds.size(); i++) {
+            String key = filds.get(i).name();
+            reconstructedRecord.put(key, element.get(key));
+        }
+        return encoder.serialize(topic, reconstructedRecord);
+    }
+
+}
+


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services