You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/21 18:36:41 UTC
incubator-gobblin git commit: [GOBBLIN-561] Handle data completeness
checks for data partitions with no records.
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 5a6bfea9f -> fcd57541a
[GOBBLIN-561] Handle data completeness checks for data partitions with no records.
Closes #2422 from sv2000/compaction
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fcd57541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fcd57541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fcd57541
Branch: refs/heads/master
Commit: fcd57541a9c4a5273fe0836925336ad2af6bd6af
Parents: 5a6bfea
Author: sv2000 <su...@gmail.com>
Authored: Tue Aug 21 11:36:36 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Aug 21 11:36:36 2018 -0700
----------------------------------------------------------------------
.../CompactionAvroJobConfigurator.java | 49 +++++++++++++-------
.../verify/CompactionAuditCountVerifier.java | 13 ++++--
.../gobblin/hive/avro/HiveAvroSerDeManager.java | 18 +++----
.../hive/policy/HiveRegistrationPolicy.java | 1 +
.../hive/policy/HiveRegistrationPolicyBase.java | 19 ++++++--
5 files changed, 65 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index c9b7708..1779e33 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -64,6 +64,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.HadoopUtils;
@@ -157,12 +158,14 @@ public class CompactionAvroJobConfigurator {
private void configureSchema(Job job) throws IOException {
Schema newestSchema = MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
- if (this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true)) {
- AvroJob.setInputKeySchema(job, newestSchema);
+ if (newestSchema != null) {
+ if (this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true)) {
+ AvroJob.setInputKeySchema(job, newestSchema);
+ }
+ AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchema) : newestSchema);
+ AvroJob.setMapOutputValueSchema(job, newestSchema);
+ AvroJob.setOutputKeySchema(job, newestSchema);
}
- AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchema) : newestSchema);
- AvroJob.setMapOutputValueSchema(job, newestSchema);
- AvroJob.setOutputKeySchema(job, newestSchema);
}
protected void configureMapper(Job job) {
@@ -228,14 +231,13 @@ public class CompactionAvroJobConfigurator {
}
/**
- * Refer to {@link MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}
+ * Refer to {@link MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}.
+ * @return false if no valid input paths present for MR job to process, where a path is valid if it is
+ * a directory containing one or more files.
+ *
*/
- protected void configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException {
-
- this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
- for (Path path: mapReduceInputPaths) {
- FileInputFormat.addInputPath(job, path);
- }
+ protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException {
+ boolean emptyDirectoryFlag = false;
String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
CompactionPathParser parser = new CompactionPathParser(this.state);
@@ -244,7 +246,19 @@ public class CompactionAvroJobConfigurator {
log.info ("Cleaning temporary MR output directory: " + mrOutputPath);
this.fs.delete(mrOutputPath, true);
+
+ this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
+ if (this.mapReduceInputPaths.isEmpty()) {
+ this.mapReduceInputPaths.add(dataset.datasetRoot());
+ emptyDirectoryFlag = true;
+ }
+
+ for (Path path: mapReduceInputPaths) {
+ FileInputFormat.addInputPath(job, path);
+ }
+
FileOutputFormat.setOutputPath(job, mrOutputPath);
+ return emptyDirectoryFlag;
}
/**
@@ -270,14 +284,15 @@ public class CompactionAvroJobConfigurator {
addJars(conf);
Job job = Job.getInstance(conf);
job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
- this.configureInputAndOutputPaths(job, dataset);
+ boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, dataset);
+ if (emptyDirectoryFlag) {
+ this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, true);
+ }
this.configureMapper(job);
this.configureReducer(job);
-
- if (!this.shouldDeduplicate) {
+ if (emptyDirectoryFlag || !this.shouldDeduplicate) {
job.setNumReduceTasks(0);
}
-
// Configure schema at the last step because FilesInputFormat will be used internally
this.configureSchema(job);
this.isJobCreated = true;
@@ -332,7 +347,7 @@ public class CompactionAvroJobConfigurator {
}
total.add(fileStatus.getPath().getParent());
} else {
- uncompacted.add(fileStatus.getPath().getParent());
+ uncompacted.add(fileStatus.getPath().getParent());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index 5653281..7c417df 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -151,20 +151,23 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst
*/
private Result passed (String datasetName, Map<String, Long> countsByTier, String referenceTier) {
if (!countsByTier.containsKey(this.gobblinTier)) {
- return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, this.gobblinTier));
+ log.info("Missing entry for dataset: " + datasetName + " in gobblin tier: " + this.gobblinTier + "; setting count to 0.");
}
if (!countsByTier.containsKey(referenceTier)) {
- return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, referenceTier));
+ log.info("Missing entry for dataset: " + datasetName + " in reference tier: " + referenceTier + "; setting count to 0.");
}
- long refCount = countsByTier.get(referenceTier);
- long gobblinCount = countsByTier.get(this.gobblinTier);
+ long refCount = countsByTier.getOrDefault(referenceTier, 0L);
+ long gobblinCount = countsByTier.getOrDefault(this.gobblinTier, 0L);
+
+ if (refCount == 0) {
+ return new Result(true, "");
+ }
if ((double) gobblinCount / (double) refCount < this.threshold) {
return new Result (false, String.format("%s failed for %s : gobblin count = %d, %s count = %d (%f < threshold %f)",
this.getName(), datasetName, gobblinCount, referenceTier, refCount, (double) gobblinCount / (double) refCount, this.threshold));
}
-
return new Result(true, "");
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
index b30a2fa..7277c2e 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
@@ -103,11 +103,19 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
*/
@Override
public void addSerDeProperties(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
+ Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), path + " is not a directory.");
+ Schema schema;
+ try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
+ schema = getDirectorySchema(path);
+ }
+ if (schema == null) {
+ return;
+ }
hiveUnit.setSerDeType(this.serDeWrapper.getSerDe().getClass().getName());
hiveUnit.setInputFormat(this.serDeWrapper.getInputFormatClassName());
hiveUnit.setOutputFormat(this.serDeWrapper.getOutputFormatClassName());
- addSchemaProperties(path, hiveUnit);
+ addSchemaProperties(path, hiveUnit, schema);
}
@Override
@@ -129,17 +137,11 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
}
}
- private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
- Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), path + " is not a directory.");
-
+ private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit, Schema schema) throws IOException {
Path schemaFile = new Path(path, this.schemaFileName);
if (this.useSchemaFile) {
hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString());
} else {
- Schema schema ;
- try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
- schema = getDirectorySchema(path);
- }
try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_WRITING_TIMER).time()) {
addSchemaFromAvroFile(schema, schemaFile, hiveUnit);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
index 0248c42..174848c 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.hive.spec.HiveSpec;
*/
@Alpha
public interface HiveRegistrationPolicy {
+ public static final String MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY = "mapreduce.job.input.path.empty";
/**
* Get a collection of {@link HiveSpec}s for a {@link Path}, which can be used by {@link org.apache.gobblin.hive.HiveRegister}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index d03e8b9..1a4ab1d 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -84,6 +84,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
// {@value PRIMARY_TABLE_TOKEN} if present in {@value ADDITIONAL_HIVE_TABLE_NAMES} or dbPrefix.{@value HIVE_TABLE_NAME}
// .. will be replaced by the table name determined via {@link #getTableName(Path)}
public static final String PRIMARY_TABLE_TOKEN = "$PRIMARY_TABLE";
+
protected static final ConfigClient configClient =
org.apache.gobblin.config.client.ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
@@ -108,6 +109,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
protected final String dbNameSuffix;
protected final String tableNamePrefix;
protected final String tableNameSuffix;
+ protected final boolean emptyInputPathFlag;
protected final MetricContext metricContext;
@@ -128,7 +130,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
this.dbNameSuffix = props.getProp(HIVE_DATABASE_NAME_SUFFIX, StringUtils.EMPTY);
this.tableNamePrefix = props.getProp(HIVE_TABLE_NAME_PREFIX, StringUtils.EMPTY);
this.tableNameSuffix = props.getProp(HIVE_TABLE_NAME_SUFFIX, StringUtils.EMPTY);
-
+ this.emptyInputPathFlag = props.getPropAsBoolean(MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, false);
this.metricContext = Instrumented.getMetricContext(props, HiveRegister.class);
}
@@ -341,11 +343,18 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
* @throws IOException
*/
protected HiveTable getTable(Path path, String dbName, String tableName) throws IOException {
- HiveTable table = new HiveTable.Builder().withDbName(dbName).withTableName(tableName)
- .withSerdeManaager(HiveSerDeManager.get(this.props)).build();
+ HiveTable.Builder tableBuilder = new HiveTable.Builder().withDbName(dbName).withTableName(tableName);
+
+ if (!this.emptyInputPathFlag) {
+ tableBuilder = tableBuilder.withSerdeManaager(HiveSerDeManager.get(this.props));
+ }
+ HiveTable table = tableBuilder.build();
table.setLocation(this.fs.makeQualified(getTableLocation(path)).toString());
- table.setSerDeProps(path);
+
+ if (!this.emptyInputPathFlag) {
+ table.setSerDeProps(path);
+ }
// Setting table-level props.
State tableProps = new State(this.props.getTablePartitionProps());
@@ -418,4 +427,4 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
"Unable to instantiate " + HiveRegistrationPolicy.class.getSimpleName() + " with type " + policyType, e);
}
}
-}
+}
\ No newline at end of file