You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/23 22:50:57 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-747] Check schema

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a725eb  [GOBBLIN-747] Check schema
6a725eb is described below

commit 6a725eb9b76e974db352864d51a669bff871bd60
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Apr 23 15:50:50 2019 -0700

    [GOBBLIN-747] Check schema
    
    Closes #2612 from ZihanLi58/checkSchema
---
 .../main/java/org/apache/gobblin/data/management/copy/CopySource.java | 4 ++++
 .../gobblin/data/management/copy/replication/ConfigBasedDataset.java  | 4 ++++
 2 files changed, 8 insertions(+)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index a2d9dc2..3d1f2c8 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
 import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
 import org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
 import org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
+import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
 import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
 import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkHelper;
@@ -357,6 +358,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
 
           WorkUnit workUnit = new WorkUnit(extract);
           workUnit.addAll(this.state);
+          if(this.copyableDataset instanceof ConfigBasedDataset && ((ConfigBasedDataset)this.copyableDataset).getExpectedSchema() != null) {
+            workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, ((ConfigBasedDataset)this.copyableDataset).getExpectedSchema());
+          }
           serializeCopyEntity(workUnit, copyEntity);
           serializeCopyableDataset(workUnit, metadata);
           GobblinMetrics.addCustomTagToState(workUnit,
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 29d8276..83cd76e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -50,6 +50,8 @@ import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
 import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -78,6 +80,8 @@ public class ConfigBasedDataset implements CopyableDataset {
   private final PathFilter pathFilter;
   private final Optional<DataFileVersionStrategy> srcDataFileVersionStrategy;
   private final Optional<DataFileVersionStrategy> dstDataFileVersionStrategy;
+  @Setter @Getter
+  protected String expectedSchema = null;
 
   //Apply filter to directories
   private final boolean applyFilterToDirectories;