You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/04/26 19:11:14 UTC

[GitHub] [incubator-hudi] pratyakshsharma opened a new pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

pratyakshsharma opened a new pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   Supports kafka installations without schema-registry by allowing FileBasedSchemaProvider to be integrated to AvroKafkaSource.
   
   ## Brief change log
   
   - Implemented HoodieKafkaAvroDecoder.
   - Introduced a property for configuring AvroKafkaSource with or without schema-registry setup.
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416505404



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       So basically what I want to understand here is - what specific advantages you get by having separate writer and reader schemas apart from being able to handle field renames using aliases? I tried to go through avro library, but still am not convinced about this. If you could point me to some useful documentation or blog regarding this, that would be great. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416344601



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();

Review comment:
       this looks specific to confluent schema registry. Is it a good idea to assume all the messages will start with magic int? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();

Review comment:
       it also look like older version of https://github.com/confluentinc/schema-registry/commits/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java
   
   any reason it it not inspired by the latest version? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       it looks like sourceSchema is used both as writer and reader. Why? Avro is painful when writer schema is lost as some evolutions become impossible. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();
+      int length = buffer.limit() - 1 - 4;
+      Object result;
+      if (sourceSchema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = this.getDatumReader(sourceSchema, readerSchema);

Review comment:
       what the difference here bt sourceSchema and readerSchema ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416503101



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();

Review comment:
       > this looks specific to confluent schema registry. Is it a good idea to assume all the messages will start with magic int?
   Ok, the actual purpose of this PR was to support kafka installations without schema-registry. I messed it up at this point. Thank you for pointing this out. Will handle this accordingly. 
   
   > any reason it it not inspired by the latest version?
   
   No specific reason. The implementation is inspired from the kafka-avro-serializer version we are using currently. :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r425111695



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       @afilipchik got a chance to explore this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416489207



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       Did not get this point. What do you mean by writer schema getting lost? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#issuecomment-629811530


   So handling schema evolutions without schema-registry is going to be really tricky. I tried googling around this stuff, and found the below 2 links. These might be useful in what we want to achieve - 
   
   1. https://stackoverflow.com/questions/37290303/producing-and-consuming-avro-messages-from-kafka-without-confluent-components
   2. https://github.com/farmdawgnation/registryless-avro-converter
   
   Particularly the second repository aims at serializing and deserializing avro data without schema-registry using Confluent and Avro libraries. At a high level, it looks like they are also not handling schema evolution in their code. I would need some time to go through it in depth though. 
   Also if you see the description of jira (https://issues.apache.org/jira/browse/HUDI-73), it mentions integration of AvroKafkaSource with FilebasedSchemaProvider (which is what is done in this PR :) ). If we really want to integrate it with FilebasedSchemaProvider, then I do not think it is feasible to handle schema evolution, since as a user, one cannot keep on changing schema files on every evolution. Thoughts? @vinothchandar 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416503101



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();

Review comment:
       > this looks specific to confluent schema registry. Is it a good idea to assume all the messages will start with magic int?
   
   Ok, the actual purpose of this PR was to support kafka installations without schema-registry. I messed it up at this point. Thank you for pointing this out. Will handle this accordingly. 
   
   > any reason it it not inspired by the latest version?
   
   No specific reason. The implementation is inspired from the kafka-avro-serializer version we are using currently. :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r425432347



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();

Review comment:
       if we are reusing code, we need to be also mindful of updates needed for NOTICE and LICENSE> 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] codecov-io commented on pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#issuecomment-619614841


   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=h1) Report
   > Merging [#1565](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/19ca0b56296bf89c611406bb86c3626f87d562ee&el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `60.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1565/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1565      +/-   ##
   ============================================
   - Coverage     71.82%   71.79%   -0.04%     
   - Complexity      294      303       +9     
   ============================================
     Files           383      386       +3     
     Lines         16549    16598      +49     
     Branches       1663     1667       +4     
   ============================================
   + Hits          11887    11916      +29     
   - Misses         3930     3946      +16     
   - Partials        732      736       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...ies/serde/AbstractHoodieKafkaAvroDeserializer.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NlcmRlL0Fic3RyYWN0SG9vZGllS2Fma2FBdnJvRGVzZXJpYWxpemVyLmphdmE=) | `60.60% <60.60%> (ø)` | `6.00 <6.00> (?)` | |
   | [...e/hudi/utilities/serde/HoodieKafkaAvroDecoder.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NlcmRlL0hvb2RpZUthZmthQXZyb0RlY29kZXIuamF2YQ==) | `75.00% <75.00%> (ø)` | `1.00 <1.00> (?)` | |
   | [...hudi/utilities/schema/FilebasedSchemaProvider.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9GaWxlYmFzZWRTY2hlbWFQcm92aWRlci5qYXZh) | `83.33% <100.00%> (+2.08%)` | `5.00 <0.00> (ø)` | |
   | [...e/config/HoodieKafkaAvroDeserializationConfig.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NlcmRlL2NvbmZpZy9Ib29kaWVLYWZrYUF2cm9EZXNlcmlhbGl6YXRpb25Db25maWcuamF2YQ==) | `100.00% <100.00%> (ø)` | `2.00 <2.00> (?)` | |
   | [...che/hudi/common/util/BufferedRandomAccessFile.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvQnVmZmVyZWRSYW5kb21BY2Nlc3NGaWxlLmphdmE=) | `54.38% <0.00%> (-0.88%)` | `0.00% <0.00%> (ø%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=footer). Last update [19ca0b5...a71f602](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] codecov-io edited a comment on pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#issuecomment-619614841


   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=h1) Report
   > Merging [#1565](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/19ca0b56296bf89c611406bb86c3626f87d562ee&el=desc) will **decrease** coverage by `0.03%`.
   > The diff coverage is `60.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1565/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1565      +/-   ##
   ============================================
   - Coverage     71.82%   71.79%   -0.04%     
   - Complexity      294      303       +9     
   ============================================
     Files           383      386       +3     
     Lines         16549    16598      +49     
     Branches       1663     1667       +4     
   ============================================
   + Hits          11887    11916      +29     
   - Misses         3930     3946      +16     
   - Partials        732      736       +4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (ø)` | |
   | [...ies/serde/AbstractHoodieKafkaAvroDeserializer.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NlcmRlL0Fic3RyYWN0SG9vZGllS2Fma2FBdnJvRGVzZXJpYWxpemVyLmphdmE=) | `60.60% <60.60%> (ø)` | `6.00 <6.00> (?)` | |
   | [...e/hudi/utilities/serde/HoodieKafkaAvroDecoder.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NlcmRlL0hvb2RpZUthZmthQXZyb0RlY29kZXIuamF2YQ==) | `75.00% <75.00%> (ø)` | `1.00 <1.00> (?)` | |
   | [...hudi/utilities/schema/FilebasedSchemaProvider.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9GaWxlYmFzZWRTY2hlbWFQcm92aWRlci5qYXZh) | `83.33% <100.00%> (+2.08%)` | `5.00 <0.00> (ø)` | |
   | [...e/config/HoodieKafkaAvroDeserializationConfig.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NlcmRlL2NvbmZpZy9Ib29kaWVLYWZrYUF2cm9EZXNlcmlhbGl6YXRpb25Db25maWcuamF2YQ==) | `100.00% <100.00%> (ø)` | `2.00 <2.00> (?)` | |
   | [...che/hudi/common/util/BufferedRandomAccessFile.java](https://codecov.io/gh/apache/incubator-hudi/pull/1565/diff?src=pr&el=tree#diff-aHVkaS1jb21tb24vc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29tbW9uL3V0aWwvQnVmZmVyZWRSYW5kb21BY2Nlc3NGaWxlLmphdmE=) | `54.38% <0.00%> (-0.88%)` | `0.00% <0.00%> (ø%)` | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=footer). Last update [19ca0b5...a71f602](https://codecov.io/gh/apache/incubator-hudi/pull/1565?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416667228



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();

Review comment:
       @vinothchandar @afilipchik I explored a bit around this. Deserialization process has to happen exactly opposite to how the data was serialized. Also it is highly discouraged to not use schema-registry when working with avro and kafka [1][2][3]
   That said, I guess if some one wants to use vanilla AvroKafkaSource, he will have to serialize the data without actual schema-registry setup. Hence as a matter of practice and a good alternative can be mandating use of MockSchemaRegistryClient.java class for the same and hence the code written above specific to schema-registry should be fine. 
   
   I am suggesting use of MockSchemaRegistryClient.java class since that has been used by Confluent for writing their test cases and this will bring a standard practice to follow for using HoodieKafkaAvroDecoder.java class introduced with this PR. 
   
   If we agree to this, I will mention the same as a comment in the code above. 
   
   [1] https://github.com/confluentinc/confluent-kafka-dotnet/issues/530
   [2] https://medium.com/@igorvlahek1/no-need-for-schema-registry-in-your-spring-kafka-tests-a5b81468a0e1
   [3] https://stackoverflow.com/questions/45635726/kafkaavroserializer-for-serializing-avro-without-schema-registry-url




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416505404



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       So basically what I want to understand here is - what specific advantages you get by having separate writer and reader schemas? I tried to go through avro library, but still am not convinced about this. If you could point me to some useful documentation or blog regarding this, that would be great. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r419848101



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       let me find it. We had to patch kafka -> avro deser in hudi 0.5 as it was breaking in some situations. 
   In general based on spec at https://avro.apache.org/docs/current/spec.html:
   
   > Data Serialization and Deserialization
   
   >Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema.
   
   >Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r419848101



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       let me find it. We had to patch kafka -> avro deser in hudi 0.5 as it was breaking in some situations. 
   In general based on spec at https://avro.apache.org/docs/current/spec.html:
   ```Data Serialization and Deserialization
   Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema.
   
   Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution.```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] pratyakshsharma commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
pratyakshsharma commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r416489455



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);
+  }
+
+  protected Object deserialize(String topic, Boolean isKey, byte[] payload, Schema readerSchema) {
+    try {
+      ByteBuffer buffer = this.getByteBuffer(payload);
+      int id = buffer.getInt();
+      int length = buffer.limit() - 1 - 4;
+      Object result;
+      if (sourceSchema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = this.getDatumReader(sourceSchema, readerSchema);

Review comment:
       They are both same. :) 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#issuecomment-628889429


   Overall, this PR is nice in the sense that it let's us read data from Kafka using AVRO, with a fixed schema.. 
   but then, it cannot handle evolutions that well (this is an expected tradeoff , right).. I feel we should understand little better how people are sending vanilla avro data into kafka today, beofre we can resolve the open items here.. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] afilipchik commented on a change in pull request #1565: [HUDI-73]: implemented vanilla AvroKafkaSource

Posted by GitBox <gi...@apache.org>.
afilipchik commented on a change in pull request #1565:
URL: https://github.com/apache/incubator-hudi/pull/1565#discussion_r419848101



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/serde/AbstractHoodieKafkaAvroDeserializer.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.serde;
+
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.serde.config.HoodieKafkaAvroDeserializationConfig;
+
+import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
+import kafka.utils.VerifiableProperties;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class AbstractHoodieKafkaAvroDeserializer {
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private boolean useSpecificAvroReader = false;
+  private Schema sourceSchema;
+
+  public AbstractHoodieKafkaAvroDeserializer(VerifiableProperties properties) {
+    this.sourceSchema = new Schema.Parser().parse(properties.props().getProperty(FilebasedSchemaProvider.Config.SOURCE_SCHEMA_PROP));
+  }
+
+  protected void configure(HoodieKafkaAvroDeserializationConfig config) {
+    useSpecificAvroReader = config
+      .getBoolean(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG);
+  }
+
+  protected Object deserialize(byte[] payload) throws SerializationException {
+    return deserialize(null, null, payload, sourceSchema);

Review comment:
       let me find it. We had to patch kafka -> avro deser in hudi 0.5 as it was breaking in some situations. 
   In general based on spec at https://avro.apache.org/docs/current/spec.html:
   
   > Data Serialization and Deserialization
   
   Binary encoded Avro data does not include type information or field names. The benefit is that the serialized data is small, but as a result a schema must always be used in order to read Avro data correctly. The best way to ensure that the schema is structurally identical to the one used to write the data is to use the exact same schema.
   
   Therefore, files or systems that store Avro data should always include the writer's schema for that data. Avro-based remote procedure call (RPC) systems must also guarantee that remote recipients of data have a copy of the schema used to write that data. In general, it is advisable that any reader of Avro data should use a schema that is the same (as defined more fully in Parsing Canonical Form for Schemas) as the schema that was used to write the data in order to deserialize it correctly. Deserializing data into a newer schema is accomplished by specifying an additional schema, the results of which are described in Schema Resolution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org