You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/06/12 23:06:17 UTC

incubator-gobblin git commit: [GOBBLIN-489] Implement PusherFactory

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master d98f77c14 -> 261819c63


[GOBBLIN-489] Implement PusherFactory

Closes #2359 from zxcware/broker


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/261819c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/261819c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/261819c6

Branch: refs/heads/master
Commit: 261819c634710031466758d88b549b1f5aa3bd73
Parents: d98f77c
Author: zhchen <zh...@linkedin.com>
Authored: Tue Jun 12 16:05:37 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Jun 12 16:05:45 2018 -0700

----------------------------------------------------------------------
 .../apache/gobblin/broker/BrokerConstants.java  |   2 +
 .../kafka/GobblinScopePusherFactory.java        |  20 ++++
 .../gobblin/metrics/kafka/PusherFactory.java    |  55 ++++++++++
 .../metrics/kafka/PusherFactoryTest.java        | 102 +++++++++++++++++++
 .../broker/SharedResourcesBrokerFactory.java    |  26 ++++-
 .../SharedResourcesBrokerFactoryTest.java       |  31 ++++++
 6 files changed, 235 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
index c29a162..14da49f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
@@ -22,4 +22,6 @@ package org.apache.gobblin.broker;
  */
 public class BrokerConstants {
   public static final String GOBBLIN_BROKER_CONFIG_PREFIX = "gobblin.broker";
+
+  public static final String GOBBLIN_BROKER_CONFIG_NAMESPACES = "gobblin.brokerNamespaces";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java
new file mode 100644
index 0000000..a228394
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java
@@ -0,0 +1,20 @@
+package org.apache.gobblin.metrics.kafka;
+
+import org.apache.gobblin.broker.StringNameSharedResourceKey;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * An {@link PusherFactory} to create a shared {@link Pusher} instance
+ * in {@link GobblinScopeTypes}
+ */
+public class GobblinScopePusherFactory<T> extends PusherFactory<T, GobblinScopeTypes> {
+  @Override
+  public GobblinScopeTypes getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker,
+      ConfigView<GobblinScopeTypes, StringNameSharedResourceKey> config) {
+    // By default, a job level resource
+    return GobblinScopeTypes.JOB;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java
new file mode 100644
index 0000000..08cbc7e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java
@@ -0,0 +1,55 @@
+package org.apache.gobblin.metrics.kafka;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.StringNameSharedResourceKey;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * Basic resource factory to create shared {@link Pusher} instance
+ */
+@Slf4j
+public abstract class PusherFactory<T, S extends ScopeType<S>> implements SharedResourceFactory<Pusher<T>, StringNameSharedResourceKey, S> {
+  private static final String FACTORY_NAME = "pusher";
+  private static final String PUSHER_CLASS = "class";
+
+  private static final Config FALLBACK = ConfigFactory.parseMap(
+      ImmutableMap.<String, Object>builder()
+          .put(PUSHER_CLASS, LoggingPusher.class.getName())
+          .build());
+
+  @Override
+  public String getName() {
+    return FACTORY_NAME;
+  }
+
+  @Override
+  public SharedResourceFactoryResponse<Pusher<T>> createResource(SharedResourcesBroker<S> broker,
+      ScopedConfigView<S, StringNameSharedResourceKey> config)
+      throws NotConfiguredException {
+    Config pusherConfig = config.getConfig().withFallback(FALLBACK);
+    String pusherClass = pusherConfig.getString(PUSHER_CLASS);
+
+    Pusher<T> pusher;
+    try {
+      pusher = (Pusher) ConstructorUtils.invokeConstructor(Class.forName(pusherClass), pusherConfig);
+    } catch (ReflectiveOperationException e) {
+      log.warn("Unable to construct a pusher with class {}. LoggingPusher will be used", pusherClass, e);
+      pusher = new LoggingPusher<>();
+    }
+    return new ResourceInstance<>(pusher);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java
new file mode 100644
index 0000000..ad48a1e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java
@@ -0,0 +1,102 @@
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.StringNameSharedResourceKey;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * Test {@link PusherFactory}s
+ */
+public class PusherFactoryTest {
+
+  @Test
+  private void testCreateGobblinScopedDefaultPusher()
+      throws NotConfiguredException {
+    SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker.newSubscopedBuilder(
+        new JobScopeInstance("PusherFactoryTest", String.valueOf(System.currentTimeMillis()))).build();
+
+    StringNameSharedResourceKey key = new StringNameSharedResourceKey("test");
+
+    Pusher<Object> pusher = jobBroker.getSharedResource(new GobblinScopePusherFactory<>(), key);
+    Assert.assertEquals(pusher.getClass(), LoggingPusher.class);
+
+    try {
+      jobBroker.close();
+      instanceBroker.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  private void testCreateGobblinScopedCustomPusher()
+      throws NotConfiguredException {
+    Map<String, String> configAsMap = new HashMap<>();
+    configAsMap.put("gobblin.broker.pusher.class", TestPusher.class.getName());
+    configAsMap.put("gobblin.broker.pusher.id", "sharedId");
+    configAsMap.put("gobblin.broker.pusher.testPusher.id", "testPusherId");
+    configAsMap.put("gobblin.broker.pusher.testPusher.name", "testPusherName");
+
+    SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(ConfigFactory.parseMap(configAsMap), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+    SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker.newSubscopedBuilder(
+        new JobScopeInstance("PusherFactoryTest", String.valueOf(System.currentTimeMillis()))).build();
+
+    StringNameSharedResourceKey key = new StringNameSharedResourceKey("testPusher");
+
+    Pusher<String> pusher = jobBroker.getSharedResource(new GobblinScopePusherFactory<>(), key);
+
+    Assert.assertEquals(pusher.getClass(), TestPusher.class);
+    TestPusher testPusher = (TestPusher) pusher;
+    Assert.assertTrue(!testPusher.isClosed);
+    Assert.assertEquals(testPusher.id, "testPusherId");
+    Assert.assertEquals(testPusher.name, "testPusherName");
+
+    try {
+      jobBroker.close();
+      instanceBroker.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+
+    Assert.assertTrue(testPusher.isClosed);
+  }
+
+  public static class TestPusher implements Pusher<String> {
+    private boolean isClosed = false;
+    private final String id;
+    private final String name;
+
+    public TestPusher(Config config) {
+      id = config.getString("id");
+      name = config.getString("name");
+    }
+
+    @Override
+    public void pushMessages(List<String> messages) {
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+      isClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
index 06d705a..bdb85ac 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
@@ -21,6 +21,8 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -54,6 +56,10 @@ public class SharedResourcesBrokerFactory {
   public static final String BROKER_CONF_FILE_KEY = BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX + ".configuration";
   public static final String DEFAULT_BROKER_CONF_FILE = "gobblinBroker.conf";
 
+  private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();
+  private static final Config BROKER_NAMESPACES_FALLBACK = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+      .put(BrokerConstants.GOBBLIN_BROKER_CONFIG_NAMESPACES, "").build());
+
   /**
    * Create a root {@link SharedResourcesBroker}. Subscoped brokers should be built using
    * {@link SharedResourcesBroker#newSubscopedBuilder(ScopeInstance)}.
@@ -78,7 +84,7 @@ public class SharedResourcesBrokerFactory {
     return new SharedResourcesBrokerImpl<>(new DefaultBrokerCache<S>(),
         scopeWrapper,
         Lists.newArrayList(new SharedResourcesBrokerImpl.ScopedConfig<>(globalScope.getType(),
-            ConfigUtils.getConfigOrEmpty(addSystemConfigurationToConfig(config), BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX))),
+            getBrokerConfig(addSystemConfigurationToConfig(config)))),
         ImmutableMap.of(globalScope.getType(), scopeWrapper));
   }
 
@@ -86,6 +92,24 @@ public class SharedResourcesBrokerFactory {
   private static SharedResourcesBroker<SimpleScopeType> SINGLETON;
 
   /**
+   * Get all broker configurations from the given {@code srcConfig}. Configurations from
+   * {@value BrokerConstants#GOBBLIN_BROKER_CONFIG_PREFIX} is always loaded first, then in-order from namespaces,
+   * which is encoded as a comma separated string keyed by {@value BrokerConstants#GOBBLIN_BROKER_CONFIG_NAMESPACES}.
+   */
+  @VisibleForTesting
+  static Config getBrokerConfig(Config srcConfig) {
+    Config allSrcConfig = srcConfig.withFallback(BROKER_NAMESPACES_FALLBACK);
+    String namespaces = allSrcConfig.getString(BrokerConstants.GOBBLIN_BROKER_CONFIG_NAMESPACES);
+    Config brokerConfig = ConfigUtils.getConfigOrEmpty(allSrcConfig, BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX);
+
+    for (String namespace : LIST_SPLITTER.splitToList(namespaces)) {
+      brokerConfig = brokerConfig.withFallback(ConfigUtils.getConfigOrEmpty(allSrcConfig, namespace));
+    }
+
+    return brokerConfig;
+  }
+
+  /**
    * Get the implicit {@link SharedResourcesBroker} in the callers thread. This is either a singleton broker configured
    * from environment variables, java options, and classpath configuration options, or a specific broker injected
    * elsewhere in the application.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
index 1b1c3e8..2df48a9 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin.broker;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -67,6 +69,35 @@ public class SharedResourcesBrokerFactoryTest {
     Assert.assertEquals(configView.getConfig().getString("testKey"), "testValue");
   }
 
+  @Test
+  public void testGetBrokerConfig() {
+    Map<String, String> srcConfigMap = new HashMap<>();
+    srcConfigMap.put("gobblin.broker.key1", "value1");
+
+    // Test global namespace, "gobblin.broker"
+    Config brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap));
+    Config expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1"));
+    Assert.assertEquals(brokerConfig, expectedConfig);
+
+    // Test extra namespace, "gobblin.shared"
+    srcConfigMap.put("gobblin.shared.key2", "value2");
+    srcConfigMap.put("gobblin.brokerNamespaces", "gobblin.shared");
+    brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap));
+    expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1","key2", "value2"));
+    Assert.assertEquals(brokerConfig, expectedConfig);
+
+    // Test a list of extra namespaces, configurations are respected in order
+    srcConfigMap.put("gobblin.shared.key2", "value2");
+    srcConfigMap.put("gobblin.shared.key3", "value3");
+    srcConfigMap.put("gobblin.shared2.key3", "value3x");
+    srcConfigMap.put("gobblin.shared2.key4", "value4");
+    srcConfigMap.put("gobblin.brokerNamespaces", "gobblin.shared, gobblin.shared2");
+    brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap));
+    expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1", "key2", "value2",
+        "key3", "value3", "key4", "value4"));
+    Assert.assertEquals(brokerConfig, expectedConfig);
+  }
+
   public static class ImplicitBrokerTest implements Runnable {
 
     @Override