You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/08/21 18:36:41 UTC

incubator-gobblin git commit: [GOBBLIN-561] Handle data completeness checks for data partitions with no records.

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 5a6bfea9f -> fcd57541a


[GOBBLIN-561] Handle data completeness checks for data partitions with no records.

Closes #2422 from sv2000/compaction


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/fcd57541
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/fcd57541
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/fcd57541

Branch: refs/heads/master
Commit: fcd57541a9c4a5273fe0836925336ad2af6bd6af
Parents: 5a6bfea
Author: sv2000 <su...@gmail.com>
Authored: Tue Aug 21 11:36:36 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Aug 21 11:36:36 2018 -0700

----------------------------------------------------------------------
 .../CompactionAvroJobConfigurator.java          | 49 +++++++++++++-------
 .../verify/CompactionAuditCountVerifier.java    | 13 ++++--
 .../gobblin/hive/avro/HiveAvroSerDeManager.java | 18 +++----
 .../hive/policy/HiveRegistrationPolicy.java     |  1 +
 .../hive/policy/HiveRegistrationPolicyBase.java | 19 ++++++--
 5 files changed, 65 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
index c9b7708..1779e33 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/mapreduce/CompactionAvroJobConfigurator.java
@@ -64,6 +64,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.FileSystemDataset;
+import org.apache.gobblin.hive.policy.HiveRegistrationPolicy;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.HadoopUtils;
@@ -157,12 +158,14 @@ public class CompactionAvroJobConfigurator {
 
   private void configureSchema(Job job) throws IOException {
     Schema newestSchema = MRCompactorAvroKeyDedupJobRunner.getNewestSchemaFromSource(job, this.fs);
-    if (this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true)) {
-      AvroJob.setInputKeySchema(job, newestSchema);
+    if (newestSchema != null) {
+      if (this.state.getPropAsBoolean(MRCompactorAvroKeyDedupJobRunner.COMPACTION_JOB_AVRO_SINGLE_INPUT_SCHEMA, true)) {
+        AvroJob.setInputKeySchema(job, newestSchema);
+      }
+      AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchema) : newestSchema);
+      AvroJob.setMapOutputValueSchema(job, newestSchema);
+      AvroJob.setOutputKeySchema(job, newestSchema);
     }
-    AvroJob.setMapOutputKeySchema(job, this.shouldDeduplicate ? getKeySchema(job, newestSchema) : newestSchema);
-    AvroJob.setMapOutputValueSchema(job, newestSchema);
-    AvroJob.setOutputKeySchema(job, newestSchema);
   }
 
   protected void configureMapper(Job job) {
@@ -228,14 +231,13 @@ public class CompactionAvroJobConfigurator {
   }
 
   /**
-   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}
+   * Refer to {@link MRCompactorAvroKeyDedupJobRunner#configureInputAndOutputPaths(Job)}.
+   * @return false if no valid input paths present for MR job to process,  where a path is valid if it is
+   * a directory containing one or more files.
+   *
    */
-  protected void configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException {
-
-    this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
-    for (Path path: mapReduceInputPaths) {
-      FileInputFormat.addInputPath(job, path);
-    }
+  protected boolean configureInputAndOutputPaths(Job job, FileSystemDataset dataset) throws IOException {
+    boolean emptyDirectoryFlag = false;
 
     String mrOutputBase = this.state.getProp(MRCompactor.COMPACTION_JOB_DIR);
     CompactionPathParser parser = new CompactionPathParser(this.state);
@@ -244,7 +246,19 @@ public class CompactionAvroJobConfigurator {
 
     log.info ("Cleaning temporary MR output directory: " + mrOutputPath);
     this.fs.delete(mrOutputPath, true);
+
+    this.mapReduceInputPaths = getGranularInputPaths(dataset.datasetRoot());
+    if (this.mapReduceInputPaths.isEmpty()) {
+      this.mapReduceInputPaths.add(dataset.datasetRoot());
+      emptyDirectoryFlag = true;
+    }
+
+    for (Path path: mapReduceInputPaths) {
+      FileInputFormat.addInputPath(job, path);
+    }
+
     FileOutputFormat.setOutputPath(job, mrOutputPath);
+    return emptyDirectoryFlag;
   }
 
   /**
@@ -270,14 +284,15 @@ public class CompactionAvroJobConfigurator {
     addJars(conf);
     Job job = Job.getInstance(conf);
     job.setJobName(MRCompactorJobRunner.HADOOP_JOB_NAME);
-    this.configureInputAndOutputPaths(job, dataset);
+    boolean emptyDirectoryFlag = this.configureInputAndOutputPaths(job, dataset);
+    if (emptyDirectoryFlag) {
+      this.state.setProp(HiveRegistrationPolicy.MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, true);
+    }
     this.configureMapper(job);
     this.configureReducer(job);
-
-    if (!this.shouldDeduplicate) {
+    if (emptyDirectoryFlag || !this.shouldDeduplicate) {
       job.setNumReduceTasks(0);
     }
-
     // Configure schema at the last step because FilesInputFormat will be used internally
     this.configureSchema(job);
     this.isJobCreated = true;
@@ -332,7 +347,7 @@ public class CompactionAvroJobConfigurator {
         }
         total.add(fileStatus.getPath().getParent());
       } else {
-        uncompacted.add(fileStatus.getPath().getParent());
+          uncompacted.add(fileStatus.getPath().getParent());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
----------------------------------------------------------------------
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
index 5653281..7c417df 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionAuditCountVerifier.java
@@ -151,20 +151,23 @@ public class CompactionAuditCountVerifier implements CompactionVerifier<FileSyst
    */
   private Result passed (String datasetName, Map<String, Long> countsByTier, String referenceTier) {
     if (!countsByTier.containsKey(this.gobblinTier)) {
-      return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, this.gobblinTier));
+      log.info("Missing entry for dataset: " + datasetName + " in gobblin tier: " + this.gobblinTier + "; setting count to 0.");
     }
     if (!countsByTier.containsKey(referenceTier)) {
-      return new Result(false, String.format("Failed to get audit count for topic %s, tier %s", datasetName, referenceTier));
+      log.info("Missing entry for dataset: " + datasetName + " in reference tier: " + referenceTier + "; setting count to 0.");
     }
 
-    long refCount = countsByTier.get(referenceTier);
-    long gobblinCount = countsByTier.get(this.gobblinTier);
+    long refCount = countsByTier.getOrDefault(referenceTier, 0L);
+    long gobblinCount = countsByTier.getOrDefault(this.gobblinTier, 0L);
+
+    if (refCount == 0) {
+      return new Result(true, "");
+    }
 
     if ((double) gobblinCount / (double) refCount < this.threshold) {
       return new Result (false, String.format("%s failed for %s : gobblin count = %d, %s count = %d (%f < threshold %f)",
               this.getName(), datasetName, gobblinCount, referenceTier, refCount, (double) gobblinCount / (double) refCount, this.threshold));
     }
-
     return new Result(true, "");
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
index b30a2fa..7277c2e 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/avro/HiveAvroSerDeManager.java
@@ -103,11 +103,19 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
    */
   @Override
   public void addSerDeProperties(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
+    Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), path + " is not a directory.");
+    Schema schema;
+    try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
+      schema = getDirectorySchema(path);
+    }
+    if (schema == null) {
+      return;
+    }
     hiveUnit.setSerDeType(this.serDeWrapper.getSerDe().getClass().getName());
     hiveUnit.setInputFormat(this.serDeWrapper.getInputFormatClassName());
     hiveUnit.setOutputFormat(this.serDeWrapper.getOutputFormatClassName());
 
-    addSchemaProperties(path, hiveUnit);
+    addSchemaProperties(path, hiveUnit, schema);
   }
 
   @Override
@@ -129,17 +137,11 @@ public class HiveAvroSerDeManager extends HiveSerDeManager {
     }
   }
 
-  private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit) throws IOException {
-    Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), path + " is not a directory.");
-
+  private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit, Schema schema) throws IOException {
     Path schemaFile = new Path(path, this.schemaFileName);
     if (this.useSchemaFile) {
       hiveUnit.setSerDeProp(SCHEMA_URL, schemaFile.toString());
     } else {
-      Schema schema ;
-      try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
-        schema = getDirectorySchema(path);
-      }
       try (Timer.Context context = metricContext.timer(HIVE_SPEC_SCHEMA_WRITING_TIMER).time()) {
         addSchemaFromAvroFile(schema, schemaFile, hiveUnit);
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
index 0248c42..174848c 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicy.java
@@ -33,6 +33,7 @@ import org.apache.gobblin.hive.spec.HiveSpec;
  */
 @Alpha
 public interface HiveRegistrationPolicy {
+  public static final String MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY = "mapreduce.job.input.path.empty";
 
   /**
    * Get a collection of {@link HiveSpec}s for a {@link Path}, which can be used by {@link org.apache.gobblin.hive.HiveRegister}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/fcd57541/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
----------------------------------------------------------------------
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index d03e8b9..1a4ab1d 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -84,6 +84,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
   // {@value PRIMARY_TABLE_TOKEN} if present in {@value ADDITIONAL_HIVE_TABLE_NAMES} or dbPrefix.{@value HIVE_TABLE_NAME}
   // .. will be replaced by the table name determined via {@link #getTableName(Path)}
   public static final String PRIMARY_TABLE_TOKEN = "$PRIMARY_TABLE";
+
   protected static final ConfigClient configClient =
       org.apache.gobblin.config.client.ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
 
@@ -108,6 +109,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
   protected final String dbNameSuffix;
   protected final String tableNamePrefix;
   protected final String tableNameSuffix;
+  protected final boolean emptyInputPathFlag;
 
   protected final MetricContext metricContext;
 
@@ -128,7 +130,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
     this.dbNameSuffix = props.getProp(HIVE_DATABASE_NAME_SUFFIX, StringUtils.EMPTY);
     this.tableNamePrefix = props.getProp(HIVE_TABLE_NAME_PREFIX, StringUtils.EMPTY);
     this.tableNameSuffix = props.getProp(HIVE_TABLE_NAME_SUFFIX, StringUtils.EMPTY);
-
+    this.emptyInputPathFlag = props.getPropAsBoolean(MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, false);
     this.metricContext = Instrumented.getMetricContext(props, HiveRegister.class);
   }
 
@@ -341,11 +343,18 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
    * @throws IOException
    */
   protected HiveTable getTable(Path path, String dbName, String tableName) throws IOException {
-    HiveTable table = new HiveTable.Builder().withDbName(dbName).withTableName(tableName)
-        .withSerdeManaager(HiveSerDeManager.get(this.props)).build();
 
+    HiveTable.Builder tableBuilder = new HiveTable.Builder().withDbName(dbName).withTableName(tableName);
+
+    if (!this.emptyInputPathFlag) {
+      tableBuilder = tableBuilder.withSerdeManaager(HiveSerDeManager.get(this.props));
+    }
+    HiveTable table = tableBuilder.build();
     table.setLocation(this.fs.makeQualified(getTableLocation(path)).toString());
-    table.setSerDeProps(path);
+
+    if (!this.emptyInputPathFlag) {
+      table.setSerDeProps(path);
+    }
 
     // Setting table-level props.
     State tableProps = new State(this.props.getTablePartitionProps());
@@ -418,4 +427,4 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
           "Unable to instantiate " + HiveRegistrationPolicy.class.getSimpleName() + " with type " + policyType, e);
     }
   }
-}
+}
\ No newline at end of file