You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/08/25 21:01:50 UTC

[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3091: [GOBBLIN-1248] Fix discrepancy between table schema and file schema

autumnust commented on a change in pull request #3091:
URL: https://github.com/apache/incubator-gobblin/pull/3091#discussion_r476734787



##########
File path: gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
##########
@@ -192,16 +192,47 @@ protected void registerPath(HiveSpec spec) throws IOException {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and compare that
+   * with the creation time of writer's schema. If they are the same, then we will update the
+   * table schema to the writer's schema, else we will keep the table schema the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly update the schema to
+   * the latest schema fetched from schema registry
+   * Note: We treat the creation time as version number of schema, since according to Kafka team,
+   * schema registry allows "out of order registration" of schemas, this means chronological latest is
+   * NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  private void updateSchema(HiveSpec spec, Table table, HiveTable existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema);
+        Schema latestSchema = (Schema) this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+        String latestSchemaCreationTime = AvroUtils.getSchemaCreationTime(latestSchema);
+        // If no schema set for the table spec, we fall back to schema fetched from schema registry
+        Schema writerSchema = new Schema.Parser().parse((
+            spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), latestSchema.toString())));
+        String writerSchemaCreationTime = AvroUtils.getSchemaCreationTime(writerSchema);
+        if(writerSchemaCreationTime == null || latestSchemaCreationTime == null || latestSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), writerSchema);
+        } else {
+          // If creation time of writer schema does not equal to the latest schema, we don't update the schema
+          spec.getTable()
+              .getSerDeProps()
+              .setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
+                  existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        }
         table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", topicName), e);
+      } catch ( IOException e) {
+        log.error(String.format("Error when updating latest schema for topic %s", topicName), e);

Review comment:
       It is usually an anti-pattern to log.error with exception with IOE thrown at the same time. Please consider keeping one of them. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org