You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/18 04:04:31 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1080] Add
configuration to add schema creation time in converter
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4ca12cf [GOBBLIN-1080] Add configuration to add schema creation time in converter
4ca12cf is described below
commit 4ca12cffbe13c80c5acd5aaa3ee81eda22818b22
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Mar 17 21:04:23 2020 -0700
[GOBBLIN-1080] Add configuration to add schema creation time in converter
Closes #2925 from ZihanLi58/GOBBLIN-1080-new
---
.../java/org/apache/gobblin/converter/Converter.java | 1 +
.../converter/filter/AvroProjectionConverter.java | 6 ++++--
.../GobblinTrackingEventFlattenFilterConverter.java | 2 ++
.../apache/gobblin/writer/AvroHdfsDataWriterTest.java | 10 ++++++++--
.../main/java/org/apache/gobblin/util/AvroUtils.java | 19 +++++++++++++++++++
5 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
index b022304..2b50ec0 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/converter/Converter.java
@@ -81,6 +81,7 @@ public abstract class Converter<SI, SO, DI, DO> implements Closeable, FinalState
*
* <p>
* Schema conversion is limited to have a 1-to-1 mapping between the input and output schema.
+ * When try to convert avro schema, please call {@link AvroUtils.addSchemaCreationTime to preserve schame creationTime}
* </p>
*
* @param inputSchema input schema to be converted
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
index 2d2202f..b9ff18f 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/AvroProjectionConverter.java
@@ -74,10 +74,12 @@ public class AvroProjectionConverter extends AvroToAvroConverterBase {
*/
@Override
public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+ Schema outputSchema = inputSchema;
if (this.fieldRemover.isPresent()) {
- return this.fieldRemover.get().removeFields(inputSchema);
+ outputSchema = this.fieldRemover.get().removeFields(inputSchema);
}
- return inputSchema;
+ AvroUtils.addSchemaCreationTime(inputSchema, outputSchema);
+ return outputSchema;
}
/**
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
index d393a8c..0db03ec 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/converter/filter/GobblinTrackingEventFlattenFilterConverter.java
@@ -34,6 +34,7 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.typesafe.config.Config;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.AvroToAvroConverterBase;
import org.apache.gobblin.converter.Converter;
@@ -116,6 +117,7 @@ public class GobblinTrackingEventFlattenFilterConverter extends AvroToAvroConver
.createRecord(ConfigUtils.getString(config, NEW_SCHEMA_NAME, inputSchema.getName()), inputSchema.getDoc(),
inputSchema.getNamespace(), inputSchema.isError());
outputSchema.setFields(newFields);
+ AvroUtils.addSchemaCreationTime(inputSchema, outputSchema);
return outputSchema;
}
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java
index bbe772d..2e44336 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/AvroHdfsDataWriterTest.java
@@ -27,6 +27,8 @@ import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -50,7 +52,8 @@ import org.apache.gobblin.util.FinalState;
*/
@Test(groups = { "gobblin.writer" })
public class AvroHdfsDataWriterTest {
-
+ private static final String TEST_PROPERTY_KEY = "test.property";
+ private static final String TEST_PROPERTY_VALUE = "testValue";
private static final Type FIELD_ENTRY_TYPE = new TypeToken<Map<String, Object>>() {}.getType();
private Schema schema;
@@ -71,6 +74,7 @@ public class AvroHdfsDataWriterTest {
}
this.schema = new Schema.Parser().parse(TestConstants.AVRO_SCHEMA);
+ schema.addProp(TEST_PROPERTY_KEY, TEST_PROPERTY_VALUE);
this.filePath = TestConstants.TEST_EXTRACT_NAMESPACE.replaceAll("\\.", "/") + "/" + TestConstants.TEST_EXTRACT_TABLE
+ "/" + TestConstants.TEST_EXTRACT_ID + "_" + TestConstants.TEST_EXTRACT_PULL_TYPE;
@@ -104,7 +108,9 @@ public class AvroHdfsDataWriterTest {
File outputFile =
new File(TestConstants.TEST_OUTPUT_DIR + Path.SEPARATOR + this.filePath, TestConstants.TEST_FILE_NAME);
DataFileReader<GenericRecord> reader =
- new DataFileReader<>(outputFile, new GenericDatumReader<GenericRecord>(this.schema));
+ new DataFileReader<>(outputFile, new GenericDatumReader<GenericRecord>());
+ Schema fileSchema = reader.getSchema();
+ Assert.assertEquals(fileSchema.getProp(TEST_PROPERTY_KEY), TEST_PROPERTY_VALUE);
// Read the records back and assert they are identical to the ones written
GenericRecord user1 = reader.next();
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 78b6152..bf4dc76 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -54,6 +54,7 @@ import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -97,6 +98,8 @@ public class AvroUtils {
private static final String AVRO_SUFFIX = ".avro";
+ private static final String SCHEMA_CREATION_TIME_KEY = "CreatedOn";
+
/**
* Validates that the provided reader schema can be used to decode avro data written with the
* provided writer schema.
@@ -116,6 +119,22 @@ public class AvroUtils {
return SchemaCompatibility.checkReaderWriterCompatibility(readerSchema, writerSchema).getType().equals(SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE);
}
+ public static Schema addSchemaCreationTime(Schema inputSchema, Schema outputSchema) {
+ if (inputSchema.getProp(SCHEMA_CREATION_TIME_KEY) != null && outputSchema.getProp(SCHEMA_CREATION_TIME_KEY) == null) {
+ outputSchema.addProp(SCHEMA_CREATION_TIME_KEY, inputSchema.getProp(SCHEMA_CREATION_TIME_KEY));
+ }
+ return outputSchema;
+ }
+
+ public static String getSchemaCreationTime(Schema inputSchema) {
+ return inputSchema.getProp(SCHEMA_CREATION_TIME_KEY);
+ }
+
+ public static Schema setSchemaCreationTime(Schema inputSchema, String creationTime) {
+ inputSchema.addProp(SCHEMA_CREATION_TIME_KEY, creationTime);
+ return inputSchema;
+ }
+
public static List<Field> deepCopySchemaFields(Schema readerSchema) {
return readerSchema.getFields().stream()
.map(field -> {