You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/05/05 03:44:09 UTC

[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r419848101



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       let me find it. We had to patch kafka -> avro deser in hudi 0.5 as it was breaking in some situations. 
   In general based on spec at https://avro.apache.org/docs/current/spec.html:
   ```Data Serialization and Deserialization
   Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema.
   
   Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution.```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org