You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/11/22 21:52:02 UTC
[incubator-gobblin] branch master updated:
[Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in
AvroToOrc
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 48d2eb5 [Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in AvroToOrc
48d2eb5 is described below
commit 48d2eb56e0a6b1b570c7cf67489b566976574a66
Author: Zihan Li <zi...@zihli-mn1.linkedin.biz>
AuthorDate: Fri Nov 22 13:51:55 2019 -0800
[Gobblin-975][GOBBLIN-975] Add flag to enable/disable avro type check in AvroToOrc
Closes #2822 from ZihanLi58/GOBBLIN-975
---
.../conversion/hive/source/HiveSource.java | 26 ++++++++++++----------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index fe4f94f..3ff8096 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -106,6 +106,8 @@ import org.apache.gobblin.data.management.conversion.hive.extractor.HiveConvertE
@Alpha
public class HiveSource implements Source {
+ public static final String DISABLE_AVRO_CHAECK = "hive.source.disable.avro.check";
+ public static final boolean DEFAULT_DISABLE_AVRO_CHAECK = false;
public static final String HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY = "hive.source.maximum.lookbackDays";
public static final int DEFAULT_HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS = 3;
@@ -163,7 +165,7 @@ public class HiveSource implements Source {
EventSubmitter.submit(Optional.of(this.eventSubmitter), EventConstants.CONVERSION_FIND_HIVE_TABLES_EVENT);
Iterator<HiveDataset> iterator = this.datasetFinder.getDatasetsIterator();
-
+ boolean disableAvroCheck = state.getPropAsBoolean(DISABLE_AVRO_CHAECK, DEFAULT_DISABLE_AVRO_CHAECK);
while (iterator.hasNext()) {
HiveDataset hiveDataset = iterator.next();
try (AutoReturnableObject<IMetaStoreClient> client = hiveDataset.getClientPool().getClient()) {
@@ -174,9 +176,9 @@ public class HiveSource implements Source {
if (hiveDataset.getTable().isPartitioned()
&& state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS,
DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS)) {
- createWorkunitsForPartitionedTable(hiveDataset, client);
+ createWorkunitsForPartitionedTable(hiveDataset, client, disableAvroCheck);
} else {
- createWorkunitForNonPartitionedTable(hiveDataset);
+ createWorkunitForNonPartitionedTable(hiveDataset, disableAvroCheck);
}
}
}
@@ -221,7 +223,7 @@ public class HiveSource implements Source {
}
- protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset) throws IOException {
+ protected void createWorkunitForNonPartitionedTable(HiveDataset hiveDataset, boolean disableAvroCheck) throws IOException {
// Create workunits for tables
try {
@@ -246,7 +248,7 @@ public class HiveSource implements Source {
"Creating workunit for table %s as updateTime %s or createTime %s is greater than low watermark %s",
hiveDataset.getTable().getCompleteName(), updateTime, hiveDataset.getTable().getTTable().getCreateTime(),
lowWatermark.getValue()));
- HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset);
+ HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset, disableAvroCheck);
LongWatermark expectedDatasetHighWatermark =
this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), tableProcessTime);
@@ -274,15 +276,15 @@ public class HiveSource implements Source {
}
}
- protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws IOException {
+ protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset, boolean disableAvroCheck) throws IOException {
HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
- if (isAvro(hiveDataset.getTable())) {
+ if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
}
return hiveWorkUnit;
}
- protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
+ protected void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client, boolean disableAvroCheck) throws IOException {
long tableProcessTime = new DateTime().getMillis();
this.watermarker.onTableProcessBegin(hiveDataset.getTable(), tableProcessTime);
@@ -326,7 +328,7 @@ public class HiveSource implements Source {
LongWatermark expectedPartitionHighWatermark = this.watermarker.getExpectedHighWatermark(sourcePartition,
tableProcessTime, partitionProcessTime);
- HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, sourcePartition);
+ HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, sourcePartition, disableAvroCheck);
hiveWorkUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedPartitionHighWatermark));
EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime,
@@ -354,9 +356,9 @@ public class HiveSource implements Source {
}
}
- protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition) throws IOException {
+ protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition, boolean disableAvroCheck) throws IOException {
HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
- if (isAvro(hiveDataset.getTable())) {
+ if (disableAvroCheck || isAvro(hiveDataset.getTable())) {
hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
}
@@ -410,7 +412,7 @@ public class HiveSource implements Source {
*/
@VisibleForTesting
public boolean isOlderThanLookback(Partition partition) {
- return new DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
+ return new DateTime(getCreateTime(partition)).isBefore(this.maxLookBackTime);
}
@VisibleForTesting