You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/08/25 18:21:59 UTC

[GitHub] [incubator-gobblin] ZihanLi58 opened a new pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

ZihanLi58 opened a new pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091


   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN/) issues and references them in the PR title. For example, "[GOBBLIN-XXX] My Gobblin PR"
       - https://issues.apache.org/jira/browse/GOBBLIN-1248
   
   
   ### Description
   - [ ] Here are some details about my PR, including screenshots (if applicable):
   **Why this PR?**
   Previously in streaming pipeline, to avoid race condition on metadata schema, when we do hive registration, we always fetch the latest schema from Kafka SchemaRegistry, since gobblin converter may change the schema, this introduce discrepancy between hive table schema and real file schema, so we need a better way to solve this problem
   **Solution**
   We leverage the schema creation time in the schema fetched from schema registry adn keep that information when converter the schema. If the schema creation time of writer's schema is the same as latest schema, we will update the table schema to the writer's schema, else we will keep the schema same as existing table's schema
   **What's the change**
   By default there is no change, to leverage this feature, the pipeline must be configured to use schema registry V2 to fetch schema which contains information about schema creation time.
   
   ### Tests
   - [ ] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason:
   Run streaming pipeline to test hive can works well
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r478652534



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -201,15 +201,14 @@ protected void registerPath(HiveSpec spec) throws IOException {
    * existing table.
    * Note: If there is no schema specified in the table spec, we will directly update the schema to
    * the existing table schema
-   * Note: We treat the creation time as version number of schema, since according to Kafka team,
-   * schema registry allows "out of order registration" of schemas, this means chronological latest is
-   * NOT what the registry considers latest.
+   * Note: We cannot treat the creation time as version number of schema, since schema registry allows
+   * "out of order registration" of schemas, this means chronological latest is NOT what the registry considers latest.
    * @param spec
    * @param table
    * @param existingTable
    * @throws IOException
    */
-  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
+  protected void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{

Review comment:
       Nit: Would be annotate as VisibleForTesting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r477465254



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,57 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema existingTableSchema = new Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+            AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        String existingSchemaCreationTime = AvroUtils.getSchemaCreationTime(existingTableSchema);
+        // If no schema set for the table spec, we fall back to existing schema
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(existingSchemaCreationTime == null || existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the existing schema, we compare with schema fetched from
+          // schema registry to determine whether to update the schema
+          Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);

Review comment:
       Do we need to get the latest schema from schema registry here? Can we have the converter set the latest schema and have it accessed here instead of fetching the schema twice from schema registry, once inside the converter and once here?

##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,57 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of

Review comment:
       It is not immediately clear why we should not update table schema if the latest schema from schema registry does not match the writer schema. Am I correct in understanding that we are delaying the schema registration in the expectation that eventually the writer schema will match the latest schema and the table schema is updated then? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r478616373



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
-        table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", topicName), e);
+        Schema existingTableSchema = new Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+            AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        String existingSchemaCreationTime = AvroUtils.getSchemaCreationTime(existingTableSchema);
+        // If no schema set for the table spec, we fall back to existing schema
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(existingSchemaCreationTime != null && !existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          // If creation time of writer schema does not equal to the existing schema, we compare with schema fetched from
+          // schema registry to determine whether to update the schema
+          Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+          String latestSchemaCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
+          if (latestSchemaCreationTime != null && latestSchemaCreationTime.equals(existingSchemaCreationTime)) {

Review comment:
       By default, the table schema is writer schema, so we only need to update it if we don't want to change schema and set it to be the existing schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r477970367



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,57 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema existingTableSchema = new Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+            AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        String existingSchemaCreationTime = AvroUtils.getSchemaCreationTime(existingTableSchema);
+        // If no schema set for the table spec, we fall back to existing schema
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(existingSchemaCreationTime == null || existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the existing schema, we compare with schema fetched from
+          // schema registry to determine whether to update the schema
+          Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);

Review comment:
       Synced offline and apply new solution




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476744238



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);

Review comment:
       According to kafka team, schema registry allows "out of order registration" of schemas - think of this as sorting schemas by compatibility instead of by timestamp. this means chronological latest is NOT what the registry considers latest.  I also include this information in comments to avoid confusing
   In addition, I update the PR to first compare the creation time with existing schema, if we see difference, we will then fetch latest schema to get the latest schema creation time. In this way, we can avoid too many calls to schema registry




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] asfgit closed pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476698036



##########
File path: gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
##########
@@ -24,6 +24,7 @@
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.util.ConfigUtils;

Review comment:
       Seems like this change is unintentional.

##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+        String latestSchemaCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
+        // If no schema set for the table spec, we fall back to schema fetched from schema registry
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(writerSchemaCreationTime == null || latestSchemaCreationTime == null || latestSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the latest schema, we don't update the schema
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+                  existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        }
         table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", topicName), e);
+      } catch ( IOException e) {
+        log.error(String.format("Error when update latest schema for topic %s", topicName), e);

Review comment:
       update -> updating

##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{

Review comment:
       It would be good to add javadoc explaining the update behavior. 

##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +193,30 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);

Review comment:
       It's confusing why we are fetching latest schema from schema registry. Can we not get the schema creation time from the hive table and update schema if the writer schema creation time > table schema creation time? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476744679



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,47 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the latest schema fetched from schema registry
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+        String latestSchemaCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
+        // If no schema set for the table spec, we fall back to schema fetched from schema registry
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(writerSchemaCreationTime == null || latestSchemaCreationTime == null || latestSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the latest schema, we don't update the schema
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+                  existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        }
         table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", topicName), e);
+      } catch ( IOException e) {
+        log.error(String.format("Error when updating latest schema for topic %s", topicName), e);

Review comment:
       Thanks for pointing out, addressed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 commented on pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
ZihanLi58 commented on pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#issuecomment-680193461


   @sv2000 @autumnust Can you help review this PR? Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] ZihanLi58 closed pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
ZihanLi58 closed pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476734787



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,47 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the latest schema fetched from schema registry
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+        String latestSchemaCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
+        // If no schema set for the table spec, we fall back to schema fetched from schema registry
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(writerSchemaCreationTime == null || latestSchemaCreationTime == null || latestSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the latest schema, we don't update the schema
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+                  existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        }
         table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", topicName), e);
+      } catch ( IOException e) {
+        log.error(String.format("Error when updating latest schema for topic %s", topicName), e);

Review comment:
       It is usually an anti-pattern to log.error with exception with IOE thrown at the same time. Please consider keeping one of them. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] sv2000 commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

Posted by GitBox <gi...@apache.org>.
sv2000 commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r478543139



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
-        table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", topicName), e);
+        Schema existingTableSchema = new Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+            AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        String existingSchemaCreationTime = AvroUtils.getSchemaCreationTime(existingTableSchema);
+        // If no schema set for the table spec, we fall back to existing schema
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), existingTableSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(existingSchemaCreationTime != null && !existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          // If creation time of writer schema does not equal to the existing schema, we compare with schema fetched from
+          // schema registry to determine whether to update the schema
+          Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+          String latestSchemaCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
+          if (latestSchemaCreationTime != null && latestSchemaCreationTime.equals(existingSchemaCreationTime)) {

Review comment:
       Shouldn't there be an else block that updates the table schema if latestSchemaCreationTime does not equal existingSchemaCreationTime?

##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,

Review comment:
       Is this comment accurate? Do you mean: We cannot use creation time as the schema version number?
   
   Also: drop "according to Kafka team" from the comment.

##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,49 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the existing table schema
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{

Review comment:
       Is it possible to add unit tests for this method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org