You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/18 04:04:31 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1080] Add configuration to add schema creation time in converter

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ca12cf  [GOBBLIN-1080] Add configuration to add schema creation time in converter
4ca12cf is described below

commit 4ca12cffbe13c80c5acd5aaa3ee81eda22818b22
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Mar 17 21:04:23 2020 -0700

    [GOBBLIN-1080] Add configuration to add schema creation time in converter
    
    Closes #2925 from ZihanLi58/GOBBLIN-1080-new
---
 .../java/org/apache/gobblin/converter/Converter.java  |  1 +
 .../converter/filter/AvroProjectionConverter.java     |  6 ++++--
 .../GobblinTrackingEventFlattenFilterConverter.java   |  2 ++
 .../apache/gobblin/writer/AvroHdfsDataWriterTest.java | 10 ++++++++--
 .../main/java/org/apache/gobblin/util/AvroUtils.java  | 19 +++++++++++++++++++
 5 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
index b022304..2b50ec0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
@@ -81,6 +81,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
    *
    * <p>
    *   Schema conversion is limited to have a 1-to-1 mapping between the input and output schema.
+   *   When try to convert avro schema, please call {@link AvroUtils.addSchemaCreationTime to preserve schame creationTime}
    * </p>
    *
    * @param inputSchema input schema to be converted
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
index 2d2202f..b9ff18f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
@@ -74,10 +74,12 @@ public class AvroProjectionConverter extends AvroToAvroConverterBase {
    */
   @Override
   public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+    Schema outputSchema = inputSchema;
     if (this.fieldRemover.isPresent()) {
-      return this.fieldRemover.get().removeFields(inputSchema);
+      outputSchema = this.fieldRemover.get().removeFields(inputSchema);
     }
-    return inputSchema;
+    AvroUtils.addSchemaCreationTime(inputSchema, outputSchema);
+    return outputSchema;
   }
 
   /**
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index d393a8c..0db03ec 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -34,6 +34,7 @@ import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.AvroToAvroConverterBase;
 import org.apache.gobblin.converter.Converter;
@@ -116,6 +117,7 @@ public class GobblinTrackingEventFlattenFilterConverter extends AvroToAvroConver
         .createRecord(ConfigUtils.getString(config, NEW_SCHEMA_NAME, inputSchema.getName()), inputSchema.getDoc(),
             inputSchema.getNamespace(), inputSchema.isError());
     outputSchema.setFields(newFields);
+    AvroUtils.addSchemaCreationTime(inputSchema, outputSchema);
     return outputSchema;
   }
 
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java
index bbe772d..2e44336 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java
@@ -27,6 +27,8 @@ import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
@@ -50,7 +52,8 @@ import org.apache.gobblin.util.FinalState;
  */
 @Test(groups = { "gobblin.writer" })
 public class AvroHdfsDataWriterTest {
-
+  private static final String TEST_PROPERTY_KEY = "test.property";
+  private static final String TEST_PROPERTY_VALUE = "testValue";
   private static final Type FIELD_ENTRY_TYPE = new TypeToken<Map<String, Object>>() {}.getType();
 
   private Schema schema;
@@ -71,6 +74,7 @@ public class AvroHdfsDataWriterTest {
     }
 
     this.schema = new Schema.Parser().parse(TestConstants.AVRO_SCHEMA);
+    schema.addProp(TEST_PROPERTY_KEY, TEST_PROPERTY_VALUE);
 
     this.filePath = TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE
         + "/" + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
@@ -104,7 +108,9 @@ public class AvroHdfsDataWriterTest {
     File outputFile =
         new File(TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath, TestConstants.TEST_FILE_NAME);
     DataFileReader<GenericRecord> reader =
-        new DataFileReader<>(outputFile, new GenericDatumReader<GenericRecord>(this.schema));
+        new DataFileReader<>(outputFile, new GenericDatumReader<GenericRecord>());
+    Schema fileSchema = reader.getSchema();
+    Assert.assertEquals(fileSchema.getProp(TEST_PROPERTY_KEY), TEST_PROPERTY_VALUE);
 
     // Read the records back and assert they are identical to the ones written
     GenericRecord user1 = reader.next();
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 78b6152..bf4dc76 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -54,6 +54,7 @@ import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -97,6 +98,8 @@ public class AvroUtils {
 
   private static final String AVRO_SUFFIX = ".avro";
 
+  private static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
+
   /**
    * Validates that the provided reader schema can be used to decode avro data written with the
    * provided writer schema.
@@ -116,6 +119,22 @@ public class AvroUtils {
     return SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema).getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
   }
 
+  public static Schema addSchemaCreationTime(Schema inputSchema, Schema outputSchema) {
+    if (inputSchema.getProp(SCHEMA_CREATION_TIME_KEY) != null && outputSchema.getProp(SCHEMA_CREATION_TIME_KEY) == null) {
+      outputSchema.addProp(SCHEMA_CREATION_TIME_KEY, inputSchema.getProp(SCHEMA_CREATION_TIME_KEY));
+    }
+    return outputSchema;
+  }
+
+  public static String getSchemaCreationTime(Schema inputSchema) {
+    return inputSchema.getProp(SCHEMA_CREATION_TIME_KEY);
+  }
+
+  public static Schema setSchemaCreationTime(Schema inputSchema, String creationTime) {
+    inputSchema.addProp(SCHEMA_CREATION_TIME_KEY, creationTime);
+    return inputSchema;
+  }
+
   public static List<Field> deepCopySchemaFields(Schema readerSchema) {
     return readerSchema.getFields().stream()
         .map(field -> {