You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/23 22:50:57 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-747] Check
schema
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 6a725eb [GOBBLIN-747] Check schema
6a725eb is described below
commit 6a725eb9b76e974db352864d51a669bff871bd60
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Tue Apr 23 15:50:50 2019 -0700
[GOBBLIN-747] Check schema
Closes #2612 from ZihanLi58/checkSchema
---
.../main/java/org/apache/gobblin/data/management/copy/CopySource.java | 4 ++++
.../gobblin/data/management/copy/replication/ConfigBasedDataset.java | 4 ++++
2 files changed, 8 insertions(+)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index a2d9dc2..3d1f2c8 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -55,6 +55,7 @@ import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
import org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
import org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
+import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkGenerator;
import org.apache.gobblin.data.management.copy.watermark.CopyableFileWatermarkHelper;
@@ -357,6 +358,9 @@ public class CopySource extends AbstractSource<String, FileAwareInputStream> {
WorkUnit workUnit = new WorkUnit(extract);
workUnit.addAll(this.state);
+ if(this.copyableDataset instanceof ConfigBasedDataset && ((ConfigBasedDataset)this.copyableDataset).getExpectedSchema() != null) {
+ workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, ((ConfigBasedDataset)this.copyableDataset).getExpectedSchema());
+ }
serializeCopyEntity(workUnit, copyEntity);
serializeCopyableDataset(workUnit, metadata);
GobblinMetrics.addCustomTagToState(workUnit,
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 29d8276..83cd76e 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -50,6 +50,8 @@ import org.apache.gobblin.util.commit.DeleteFileCommitStep;
import org.apache.gobblin.util.filesystem.DataFileVersionStrategy;
import org.apache.gobblin.util.filesystem.ModTimeDataFileVersionStrategy;
+import lombok.Getter;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@@ -78,6 +80,8 @@ public class ConfigBasedDataset implements CopyableDataset {
private final PathFilter pathFilter;
private final Optional<DataFileVersionStrategy> srcDataFileVersionStrategy;
private final Optional<DataFileVersionStrategy> dstDataFileVersionStrategy;
+ @Setter @Getter
+ protected String expectedSchema = null;
//Apply filter to directories
private final boolean applyFilterToDirectories;