You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/23 23:42:11 UTC

[GitHub] [hudi] vburenin opened a new pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

vburenin opened a new pull request #2598:
URL: https://github.com/apache/hudi/pull/2598


   ## What is the purpose of the pull request
   This PR adds ability to:
    - Inject kafka meta information into the received data from avro kafka source, so users can track which kafka topic, partition, offset and the key delivered a specific record (super useful for debugging consistency issues). In SQL query user would need to specify <KAFKA_FIELDS> and enable appropriate options to enable the injection.
    - specify custom kafka avro data decoder that takes into account source latest schema and different schema versions.
    - cache retrieved source and target latest schemas
    - return null for the target schema if it is not in use by specifying "null" as a target schema url. (I know, there is NullTargetSchemaProvider, however it looks redundant).
   
   PS. This PR is might be split into multiple different PRs, but it comes as one since development of all these features has been done simultaneously due to the mutual dependency. Unit tests will be added later when we are in agreement with this approach.
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin closed pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin closed pull request #2598:
URL: https://github.com/apache/hudi/pull/2598


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-785309398


   > @vburenin thanks for this. can we create a JIRA for this?
   
   It is in process, I may need to create multiple tickets. What I am really interested in right now is a code review so I do not spent too much time going forward and writing unit tests and then endup rolling it back.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-824052316


   There is no related PR yet. I've been planning to contribute this feature to upstream in Q2, so far it seems possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-785308824


   the goals here all look good to me. Please give me and @nsivabalan sometime to review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r585177605



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       πŸ‘ 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r584213651



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       one more clarification @vburenin. Is this approach (passing sourceSchema instead of reader schema) is going to be useful for everyone(kakfa source users), even if they are not interested in meta fields to assist in schema evolution? If yes, wondering if we should make this customer deserializer as default kafka deserializer for hudi going forward? 
   Can you throw some light here. 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] ouzhang commented on pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
ouzhang commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-823846336


   Hello. I notice that "[HUDI-1650] Custom avro kafka deserializer." has been merged. Is there another PR related to "custom kafka meta fields" ? Expecting to have this feature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-784622220


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=h1) Report
   > Merging [#2598](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=desc) (9bc13c2) into [master](https://codecov.io/gh/apache/hudi/commit/0dde7f9185a350599adaab5af5738b73a15243d7?el=desc) (0dde7f9) will **decrease** coverage by `42.37%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2598/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2598       +/-   ##
   ============================================
   - Coverage     51.55%   9.18%   -42.38%     
   + Complexity     3282      48     -3234     
   ============================================
     Files           445      55      -390     
     Lines         20317    2037    -18280     
     Branches       2102     251     -1851     
   ============================================
   - Hits          10475     187    -10288     
   + Misses         8976    1837     -7139     
   + Partials        866      13      -853     
   ```
   
   | Flag | Coverage Ξ” | Complexity Ξ” | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.18% <0.00%> (-60.37%)` | `0.00 <0.00> (ΓΈ)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=tree) | Coverage Ξ” | Complexity Ξ” | |
   |---|---|---|---|
   | [...i/utilities/deser/KafkaAvroSchemaDeserializer.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2Rlc2VyL0thZmthQXZyb1NjaGVtYURlc2VyaWFsaXplci5qYXZh) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (?)` | |
   | [.../hudi/utilities/schema/SchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFSZWdpc3RyeVByb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (ΓΈ)` | |
   | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (ΓΈ)` | |
   | [...lities/sources/helpers/AvroKafkaSourceHelpers.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9BdnJvS2Fma2FTb3VyY2VIZWxwZXJzLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (?)` | |
   | [.../utilities/transform/SqlQueryBasedTransformer.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3RyYW5zZm9ybS9TcWxRdWVyeUJhc2VkVHJhbnNmb3JtZXIuamF2YQ==) | `0.00% <0.00%> (-75.00%)` | `0.00 <0.00> (-3.00)` | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | ... and [429 more](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583931865



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);

Review comment:
       Answering your first question:
   1. That is correct. Actual data source doesn't have any meta fields unless they are encoded by user somehow, these fields are extracted on Hudi side directly from Kafka client and then injected into the deserialized object.
   
   2. The the source schema used in deser class can and should have kafka meta fields as they are optional and become a part of the record. Without the prior schema modification I can't even call 'record.put(AvroKafkaSourceHelpers.KAFKA_OFFSET_META_FIELD, obj.offset());' in a map function as it fails that such field doesn't exist in the schema.
   
   Answering the last question, it is being called by Kafka Fetcher (org.apache.kafka.clients.consumer.internals.Fetcher)
   Actually, let see what the stack trace is...
   
   ```
   	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:92)
   	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:54)
   	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
   	at org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
   	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1009)
   	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:96)
   	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1186)
   	at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1500(Fetcher.java:1035)
   	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
   	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
   	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
   	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:200)
   	at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:129)
   	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
   	at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212)
   	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
   	at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
   	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
   	at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
   	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
   	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
   	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
   	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
   	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
   	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
   	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
   	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
   	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
   	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
   	at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1364)
   	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
   	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:123)
   	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   	at java.lang.Thread.run(Thread.java:748)
   	```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583746379



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       this looks good. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);
     } catch (IOException ioe) {
       throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe);
     }
   }
 
-  @Override
-  public Schema getTargetSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
-    String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);

Review comment:
       not now. but when you put up actual PR. I see we default to sourceSchemaRegistryUrl if target registry url is not found. but in this PR, I don't see that. is that intentional ? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       may I know whats the plan in keeping this file in sync up AbstractKafkaAvroDeserializer with version upgrades? 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);

Review comment:
       may be I am missing something. shouldn't we inject kafka meta fields only to target schema? source is actual kafka data right. it may not have these meta fields. 

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       does this work.
   ```
   @Override
     protected Object deserialize(
         boolean includeSchemaAndVersion,
         String topic,
         Boolean isKey,
         byte[] payload,
         Schema readerSchema)
         throws SerializationException {
      super.deserialize(includeSchemaAndVersion,topic,isKey,payload, sourceSchema); 
   // pass sourceSchema as last arg instead of readerSchema
   }
   ```
   
   

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroKafkaSourceHelpers.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class AvroKafkaSourceHelpers {
+
+  public static final String INJECT_KAFKA_FIELDS = "hoodie.deltastreamer.source.inject_kafka_fields";

Review comment:
       nit: may be kafka meta fields instead of kakfa fields?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroKafkaSourceHelpers.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class AvroKafkaSourceHelpers {
+
+  public static final String INJECT_KAFKA_FIELDS = "hoodie.deltastreamer.source.inject_kafka_fields";
+
+  public static final String KAFKA_PARTITION = "_hudi_kafka_partition";

Review comment:
       if we plan to provide this as generic solution for everyone, lets have consensus on meta fields naming. @vinothchandar .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io edited a comment on pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-784622220


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=h1) Report
   > Merging [#2598](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=desc) (1860266) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `41.95%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2598/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2598       +/-   ##
   ============================================
   - Coverage     51.14%   9.18%   -41.96%     
   + Complexity     3215      48     -3167     
   ============================================
     Files           438      55      -383     
     Lines         20041    2035    -18006     
     Branches       2064     251     -1813     
   ============================================
   - Hits          10250     187    -10063     
   + Misses         8946    1835     -7111     
   + Partials        845      13      -832     
   ```
   
   | Flag | Coverage Ξ” | Complexity Ξ” | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.18% <0.00%> (-60.28%)` | `0.00 <0.00> (ΓΈ)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=tree) | Coverage Ξ” | Complexity Ξ” | |
   |---|---|---|---|
   | [...i/utilities/deser/KafkaAvroSchemaDeserializer.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2Rlc2VyL0thZmthQXZyb1NjaGVtYURlc2VyaWFsaXplci5qYXZh) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (?)` | |
   | [.../hudi/utilities/schema/SchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFSZWdpc3RyeVByb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (ΓΈ)` | |
   | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (ΓΈ)` | |
   | [...lities/sources/helpers/AvroKafkaSourceHelpers.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9BdnJvS2Fma2FTb3VyY2VIZWxwZXJzLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (?)` | |
   | [.../utilities/transform/SqlQueryBasedTransformer.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3RyYW5zZm9ybS9TcWxRdWVyeUJhc2VkVHJhbnNmb3JtZXIuamF2YQ==) | `0.00% <0.00%> (-75.00%)` | `0.00 <0.00> (-3.00)` | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | ... and [422 more](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583797964



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);
     } catch (IOException ioe) {
       throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe);
     }
   }
 
-  @Override
-  public Schema getTargetSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
-    String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);

Review comment:
       cool




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583822334



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       It actually should work, there was a history behind this change that predates me that ended up with only sourceSchema change as I currently see. So yah, it will drastically simplify things. Thanks for catching it!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583777748



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/AvroKafkaSourceHelpers.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class AvroKafkaSourceHelpers {
+
+  public static final String INJECT_KAFKA_FIELDS = "hoodie.deltastreamer.source.inject_kafka_fields";
+
+  public static final String KAFKA_PARTITION = "_hudi_kafka_partition";

Review comment:
       +1, we also can make it configurable.

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);

Review comment:
       There is a trick that I am utilizing such as injection of kafka meta data into incoming message and source schema when it is decoded. So it is part of the source schema. The target schema gets modified appropriately, or gets inferred if the target schema is "null".
   ```java
   if (injectKafkaData) {
         return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
             LocationStrategies.PreferConsistent()).map(AvroKafkaSourceHelpers::addKafkaFields);
   }
   ```
   
   AvroKafkaSourceHelpers::addKafkaFields is responsible for that:
   ```java
     public static GenericRecord addKafkaFields(ConsumerRecord<Object, Object> obj) {
       GenericRecord record = (GenericRecord) obj.value();
       record.put(AvroKafkaSourceHelpers.KAFKA_OFFSET, obj.offset());
       record.put(AvroKafkaSourceHelpers.KAFKA_PARTITION, obj.partition());
       record.put(AvroKafkaSourceHelpers.KAFKA_TOPIC, obj.topic());
       record.put(AvroKafkaSourceHelpers.KAFKA_KEY, obj.key());
       return record;
     }
   ```
       

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);
     } catch (IOException ioe) {
       throw new HoodieIOException("Error reading source schema from registry :" + registryUrl, ioe);
     }
   }
 
-  @Override
-  public Schema getTargetSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
-    String targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);

Review comment:
       This logic has been moved into constructor:
   ```java
    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
    this.noTargetSchema = targetRegistryUrl.equals("null");
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583833111



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);

Review comment:
       I get it now. But clarify me something. 
   ```
   return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
             LocationStrategies.PreferConsistent()).map(AvroKafkaSourceHelpers::addKafkaFields);
   ```
   What I infer from this is, first we read data from kafka source (using custom deser class) and then we do mapping to Generic object by adding kafka meta fields. 
   To Custom deser, we pass in source schema which has meta fields added. but actual kafka source data won't have these meta fields right? 
   
   Once we return from this method, within sourceFormatAdaptor, I see we leverage sourceSchema (in which we are expected to have meta fields). Here I agree we need to have kafka meta fields in source schema. 
   But not sure, if the source schema used in deser class can have kafka meta fields. 
   
   Can you help me comprehend. 
   Or can you help me understand where/when exactly AbstractKafkaAvroDeserializer.deserialize() is called in this flow. 
   sorry I haven't worked w/ kafka source in hudi. all my understanding is just by reading the code :) 
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-io commented on pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-784622220


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=h1) Report
   > Merging [#2598](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=desc) (1068595) into [master](https://codecov.io/gh/apache/hudi/commit/43a0776c7c88a5f7beac6c8853db7e341810635a?el=desc) (43a0776) will **decrease** coverage by `42.07%`.
   > The diff coverage is `0.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/2598/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #2598       +/-   ##
   ============================================
   - Coverage     51.14%   9.06%   -42.08%     
   + Complexity     3215      48     -3167     
   ============================================
     Files           438      55      -383     
     Lines         20041    2062    -17979     
     Branches       2064     254     -1810     
   ============================================
   - Hits          10250     187    -10063     
   + Misses         8946    1862     -7084     
   + Partials        845      13      -832     
   ```
   
   | Flag | Coverage Ξ” | Complexity Ξ” | |
   |---|---|---|---|
   | hudicli | `?` | `?` | |
   | hudiclient | `?` | `?` | |
   | hudicommon | `?` | `?` | |
   | hudiflink | `?` | `?` | |
   | hudihadoopmr | `?` | `?` | |
   | hudisparkdatasource | `?` | `?` | |
   | hudisync | `?` | `?` | |
   | huditimelineservice | `?` | `?` | |
   | hudiutilities | `9.06% <0.00%> (-60.40%)` | `0.00 <0.00> (ΓΈ)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/2598?src=pr&el=tree) | Coverage Ξ” | Complexity Ξ” | |
   |---|---|---|---|
   | [...tilities/decoders/KafkaAvroSchemaDeserializer.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL2RlY29kZXJzL0thZmthQXZyb1NjaGVtYURlc2VyaWFsaXplci5qYXZh) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (?)` | |
   | [.../hudi/utilities/schema/SchemaRegistryProvider.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFSZWdpc3RyeVByb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (ΓΈ)` | |
   | [...apache/hudi/utilities/sources/AvroKafkaSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb0thZmthU291cmNlLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (ΓΈ)` | |
   | [...lities/sources/helpers/AvroKafkaSourceHelpers.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvaGVscGVycy9BdnJvS2Fma2FTb3VyY2VIZWxwZXJzLmphdmE=) | `0.00% <0.00%> (ΓΈ)` | `0.00 <0.00> (?)` | |
   | [.../utilities/transform/SqlQueryBasedTransformer.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3RyYW5zZm9ybS9TcWxRdWVyeUJhc2VkVHJhbnNmb3JtZXIuamF2YQ==) | `0.00% <0.00%> (-75.00%)` | `0.00 <0.00> (-3.00)` | |
   | [...va/org/apache/hudi/utilities/IdentitySplitter.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL0lkZW50aXR5U3BsaXR0ZXIuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-2.00%)` | |
   | [...va/org/apache/hudi/utilities/schema/SchemaSet.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NjaGVtYS9TY2hlbWFTZXQuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-3.00%)` | |
   | [...a/org/apache/hudi/utilities/sources/RowSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvUm93U291cmNlLmphdmE=) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-4.00%)` | |
   | [.../org/apache/hudi/utilities/sources/AvroSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvQXZyb1NvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | [.../org/apache/hudi/utilities/sources/JsonSource.java](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree#diff-aHVkaS11dGlsaXRpZXMvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdXRpbGl0aWVzL3NvdXJjZXMvSnNvblNvdXJjZS5qYXZh) | `0.00% <0.00%> (-100.00%)` | `0.00% <0.00%> (-1.00%)` | |
   | ... and [422 more](https://codecov.io/gh/apache/hudi/pull/2598/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on pull request #2598: [HUDI-1648] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-789157626


   Is being split into multiple PRs. Closing.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r584212538



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaRegistryProvider.java
##########
@@ -58,30 +66,67 @@ private static String fetchSchemaFromRegistry(String registryUrl) throws IOExcep
   public SchemaRegistryProvider(TypedProperties props, JavaSparkContext jssc) {
     super(props, jssc);
     DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.SRC_SCHEMA_REGISTRY_URL_PROP));
+    this.cacheDisabled = !props.getBoolean(Config.CACHE_SCHEMAS, false);
+    this.injectKafkaFieldSchema = props.getBoolean(AvroKafkaSourceHelpers.INJECT_KAFKA_FIELDS, false);
+    this.registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    this.targetRegistryUrl = config.getString(Config.TARGET_SCHEMA_REGISTRY_URL_PROP, registryUrl);
+    this.noTargetSchema = targetRegistryUrl.equals("null");
   }
 
-  private static Schema getSchema(String registryUrl) throws IOException {
-    return new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+  private static Schema getSchema(String registryUrl, boolean injectKafkaFieldSchema) throws IOException {
+    Schema schema = new Schema.Parser().parse(fetchSchemaFromRegistry(registryUrl));
+    if (injectKafkaFieldSchema) {
+      return AvroKafkaSourceHelpers.addKafkaMetadataFields(schema);
+    }
+    return schema;
   }
 
   @Override
   public Schema getSourceSchema() {
-    String registryUrl = config.getString(Config.SRC_SCHEMA_REGISTRY_URL_PROP);
+    if (cacheDisabled) {
+      return getSourceSchemaFromRegistry();
+    }
+    if (sourceSchema == null) {
+      synchronized (this) {
+        if (sourceSchema == null) {
+          sourceSchema = getSourceSchemaFromRegistry();
+        }
+      }
+    }
+    return sourceSchema;
+  }
+
+  @Override
+  public Schema getTargetSchema() {
+    if (noTargetSchema) {
+      return null;
+    }
+    if (cacheDisabled) {
+      return getTargetSchemaFromRegistry();
+    }
+    if (targetSchema == null) {
+      synchronized (this) {
+        if (targetSchema == null) {
+          targetSchema = getTargetSchemaFromRegistry();
+        }
+      }
+    }
+    return targetSchema;
+  }
+
+  private Schema getSourceSchemaFromRegistry() {
     try {
-      return getSchema(registryUrl);
+      return getSchema(registryUrl, injectKafkaFieldSchema);

Review comment:
       sounds good. thanks for clarifying. makes sense. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vburenin commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vburenin commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r584225622



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       It is definitely going to be useful for everybody. Kafka fields were added after this change was introduced. We had to keep up with the schema changes in the registry and this was the way to do that. The primary assumption, that is most likely true for every user, is that a schema is evolving... so, this is the way to keep up with the schema evolution.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#discussion_r583747268



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/decoders/KafkaAvroSchemaDeserializer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.decoders;
+
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.NonRecordContainer;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.kafka.common.errors.SerializationException;
+import org.codehaus.jackson.node.JsonNodeFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+
+/**
+ * Extending {@link KafkaAvroSchemaDeserializer} as we need to be able to inject reader schema during deserialization.
+ */
+public class KafkaAvroSchemaDeserializer extends KafkaAvroDeserializer {
+
+  private static final String SCHEMA_PROVIDER_CLASS_PROP = "hoodie.deltastreamer.schemaprovider.class";
+
+  private final DecoderFactory decoderFactory = DecoderFactory.get();
+  private Schema sourceSchema;
+
+  public KafkaAvroSchemaDeserializer() {}
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    super.configure(configs, isKey);
+    try {
+      TypedProperties props = getConvertToTypedProperties(configs);
+      SchemaProvider schemaProvider = UtilHelpers.createSchemaProvider(
+          props.getString(SCHEMA_PROVIDER_CLASS_PROP), props, null);
+      sourceSchema = Objects.requireNonNull(schemaProvider).getSourceSchema();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Pretty much copy-paste from the {@link AbstractKafkaAvroDeserializer} except line 87:
+   * DatumReader reader = new GenericDatumReader(schema, sourceSchema);
+   * <p>
+   * We need to inject reader schema during deserialization or later stages of the pipeline break.
+   *
+   * @param includeSchemaAndVersion
+   * @param topic
+   * @param isKey
+   * @param payload
+   * @param readerSchema
+   * @return
+   * @throws SerializationException
+   */
+  @Override
+  protected Object deserialize(
+      boolean includeSchemaAndVersion,
+      String topic,
+      Boolean isKey,
+      byte[] payload,
+      Schema readerSchema)
+      throws SerializationException {
+    // Even if the caller requests schema & version, if the payload is null we cannot include it.
+    // The caller must handle
+    // this case.
+    if (payload == null) {
+      return null;
+    }
+    int id = -1;
+    try {
+      ByteBuffer buffer = getByteBuffer(payload);
+      id = buffer.getInt();
+      Schema schema = schemaRegistry.getByID(id);
+
+      int length = buffer.limit() - 1 - idSize;
+      final Object result;
+      if (schema.getType().equals(Schema.Type.BYTES)) {
+        byte[] bytes = new byte[length];
+        buffer.get(bytes, 0, length);
+        result = bytes;
+      } else {
+        int start = buffer.position() + buffer.arrayOffset();
+        DatumReader reader = new GenericDatumReader(schema, sourceSchema);

Review comment:
       may I know whats the plan in keeping this file in sync with AbstractKafkaAvroDeserializer with version upgrades? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #2598: [WIP] Added custom kafka meta fields and custom kafka avro decoder.

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #2598:
URL: https://github.com/apache/hudi/pull/2598#issuecomment-785308206


   @vburenin thanks for this. can we create a JIRA for this?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org