You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/24 21:28:41 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-753] Refactor HiveRegistrationPolicyBase to surface configStore object

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 423990a  [GOBBLIN-753] Refactor HiveRegistrationPolicyBase to surface configStore object
423990a is described below

commit 423990a0a1cfb29f3e6b1e74c8f5fb35efd1690d
Author: autumnust <le...@linkedin.com>
AuthorDate: Wed Apr 24 14:28:34 2019 -0700

    [GOBBLIN-753] Refactor HiveRegistrationPolicyBase to surface configStore object
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    Some refactoring in `HiveRegistrationPolicyBase`
    to make topic-specific configStore object
    available in extension class
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
        -
    https://issues.apache.org/jira/browse/GOBBLIN-753
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Refactor configstore object in
    HiveRegistrationPolicyBase to make it available in
    extending class
    
    Refactor more
    
    Closes #2618 from
    autumnust/timeCutOnConfigStoreTableName
---
 .../java/org/apache/gobblin/hive/HiveRegister.java | 20 +++++------
 .../hive/policy/HiveRegistrationPolicyBase.java    | 41 ++++++++++++++--------
 2 files changed, 34 insertions(+), 27 deletions(-)

diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
index 08a4f10..93dffe2 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveRegister.java
@@ -17,6 +17,12 @@
 
 package org.apache.gobblin.hive;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
@@ -26,17 +32,9 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-
-import com.google.common.base.Optional;
-import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HiveRegistrationUnit.Column;
@@ -47,8 +45,6 @@ import org.apache.gobblin.hive.spec.HiveSpecWithPredicates;
 import org.apache.gobblin.hive.spec.activity.Activity;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.util.executors.ScalingThreadPoolExecutor;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
 
 
 /**
diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index 1a4ab1d..ee1eb03 100644
--- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -88,6 +88,8 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
   protected static final ConfigClient configClient =
       org.apache.gobblin.config.client.ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
 
+  protected Optional<Config> configForTopic = Optional.<Config>absent();
+
   /**
    * A valid db or table name should start with an alphanumeric character, and contains only
    * alphanumeric characters and '_'.
@@ -132,6 +134,13 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
     this.tableNameSuffix = props.getProp(HIVE_TABLE_NAME_SUFFIX, StringUtils.EMPTY);
     this.emptyInputPathFlag = props.getPropAsBoolean(MAPREDUCE_JOB_INPUT_PATH_EMPTY_KEY, false);
     this.metricContext = Instrumented.getMetricContext(props, HiveRegister.class);
+
+    // Get Topic-specific config object doesn't require any runtime-set properties in prop object, safe to initialize
+    // in constructor.
+    Timer.Context context = this.metricContext.timer(CONFIG_FOR_TOPIC_TIMER).time();
+    configForTopic =
+        ConfigStoreUtils.getConfigForTopic(this.props.getProperties(), KafkaSource.TOPIC_NAME, this.configClient);
+    context.close();
   }
 
   /**
@@ -175,8 +184,8 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
   }
 
   /**
-   * This method first tries to obtain the database name from {@link #HIVE_TABLE_NAME}.
-   * If this property is not specified, it then tries to obtain the database name using
+   * This method first tries to obtain the table name from {@link #HIVE_TABLE_NAME}.
+   * If this property is not specified, it then tries to obtain the table name using
    * the first group of {@link #HIVE_TABLE_REGEX}.
    */
   protected Optional<String> getTableName(Path path) {
@@ -234,13 +243,6 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
     if ((primaryTableName = getTableName(path)).isPresent() && !dbPrefix.isPresent()) {
       tableNames.add(primaryTableName.get());
     }
-    Optional<Config> configForTopic = Optional.<Config>absent();
-    if (primaryTableName.isPresent()) {
-      Timer.Context context = this.metricContext.timer(CONFIG_FOR_TOPIC_TIMER).time();
-      configForTopic =
-          ConfigStoreUtils.getConfigForTopic(this.props.getProperties(), KafkaSource.TOPIC_NAME, this.configClient);
-      context.close();
-    }
 
     String additionalNamesProp;
     if (dbPrefix.isPresent()) {
@@ -249,7 +251,8 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
       additionalNamesProp = ADDITIONAL_HIVE_TABLE_NAMES;
     }
 
-    if (configForTopic.isPresent() && configForTopic.get().hasPath(additionalNamesProp)) {
+    // Searching additional table name from ConfigStore-returned object.
+    if (primaryTableName.isPresent() && configForTopic.isPresent() && configForTopic.get().hasPath(additionalNamesProp)) {
       for (String additionalTableName : Splitter.on(",")
           .trimResults()
           .splitToList(configForTopic.get().getString(additionalNamesProp))) {
@@ -357,11 +360,7 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
     }
 
     // Setting table-level props.
-    State tableProps = new State(this.props.getTablePartitionProps());
-    if (this.props.getRuntimeTableProps().isPresent()){
-      tableProps.setProp(HiveMetaStoreUtils.RUNTIME_PROPS, this.props.getRuntimeTableProps().get());
-    }
-    table.setProps(tableProps);
+    table.setProps(getRuntimePropsEnrichedTblProps());
 
     table.setStorageProps(this.props.getStorageProps());
     table.setSerDeProps(this.props.getSerdeProps());
@@ -371,6 +370,18 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy {
     return table;
   }
 
+  /**
+   * Enrich the table-level properties with properties carried over from ingestion runtime.
+   * Extend this class to add more runtime properties if required.
+   */
+  protected State getRuntimePropsEnrichedTblProps() {
+    State tableProps = new State(this.props.getTablePartitionProps());
+    if (this.props.getRuntimeTableProps().isPresent()){
+      tableProps.setProp(HiveMetaStoreUtils.RUNTIME_PROPS, this.props.getRuntimeTableProps().get());
+    }
+    return tableProps;
+  }
+
   protected Optional<HivePartition> getPartition(Path path, HiveTable table) throws IOException {
     return Optional.<HivePartition> absent();
   }