You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/06/12 23:06:17 UTC
incubator-gobblin git commit: [GOBBLIN-489] Implement PusherFactory
Repository: incubator-gobblin
Updated Branches:
refs/heads/master d98f77c14 -> 261819c63
[GOBBLIN-489] Implement PusherFactory
Closes #2359 from zxcware/broker
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/261819c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/261819c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/261819c6
Branch: refs/heads/master
Commit: 261819c634710031466758d88b549b1f5aa3bd73
Parents: d98f77c
Author: zhchen <zh...@linkedin.com>
Authored: Tue Jun 12 16:05:37 2018 -0700
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Jun 12 16:05:45 2018 -0700
----------------------------------------------------------------------
.../apache/gobblin/broker/BrokerConstants.java | 2 +
.../kafka/GobblinScopePusherFactory.java | 20 ++++
.../gobblin/metrics/kafka/PusherFactory.java | 55 ++++++++++
.../metrics/kafka/PusherFactoryTest.java | 102 +++++++++++++++++++
.../broker/SharedResourcesBrokerFactory.java | 26 ++++-
.../SharedResourcesBrokerFactoryTest.java | 31 ++++++
6 files changed, 235 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java b/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
index c29a162..14da49f 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/broker/BrokerConstants.java
@@ -22,4 +22,6 @@ package org.apache.gobblin.broker;
*/
public class BrokerConstants {
public static final String GOBBLIN_BROKER_CONFIG_PREFIX = "gobblin.broker";
+
+ public static final String GOBBLIN_BROKER_CONFIG_NAMESPACES = "gobblin.brokerNamespaces";
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java
new file mode 100644
index 0000000..a228394
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/GobblinScopePusherFactory.java
@@ -0,0 +1,20 @@
+package org.apache.gobblin.metrics.kafka;
+
+import org.apache.gobblin.broker.StringNameSharedResourceKey;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.ConfigView;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * An {@link PusherFactory} to create a shared {@link Pusher} instance
+ * in {@link GobblinScopeTypes}
+ */
+public class GobblinScopePusherFactory<T> extends PusherFactory<T, GobblinScopeTypes> {
+ @Override
+ public GobblinScopeTypes getAutoScope(SharedResourcesBroker<GobblinScopeTypes> broker,
+ ConfigView<GobblinScopeTypes, StringNameSharedResourceKey> config) {
+ // By default, a job level resource
+ return GobblinScopeTypes.JOB;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java
new file mode 100644
index 0000000..08cbc7e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/PusherFactory.java
@@ -0,0 +1,55 @@
+package org.apache.gobblin.metrics.kafka;
+
+import org.apache.commons.lang3.reflect.ConstructorUtils;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.ResourceInstance;
+import org.apache.gobblin.broker.StringNameSharedResourceKey;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.ScopeType;
+import org.apache.gobblin.broker.iface.ScopedConfigView;
+import org.apache.gobblin.broker.iface.SharedResourceFactory;
+import org.apache.gobblin.broker.iface.SharedResourceFactoryResponse;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * Basic resource factory to create shared {@link Pusher} instance
+ */
+@Slf4j
+public abstract class PusherFactory<T, S extends ScopeType<S>> implements SharedResourceFactory<Pusher<T>, StringNameSharedResourceKey, S> {
+ private static final String FACTORY_NAME = "pusher";
+ private static final String PUSHER_CLASS = "class";
+
+ private static final Config FALLBACK = ConfigFactory.parseMap(
+ ImmutableMap.<String, Object>builder()
+ .put(PUSHER_CLASS, LoggingPusher.class.getName())
+ .build());
+
+ @Override
+ public String getName() {
+ return FACTORY_NAME;
+ }
+
+ @Override
+ public SharedResourceFactoryResponse<Pusher<T>> createResource(SharedResourcesBroker<S> broker,
+ ScopedConfigView<S, StringNameSharedResourceKey> config)
+ throws NotConfiguredException {
+ Config pusherConfig = config.getConfig().withFallback(FALLBACK);
+ String pusherClass = pusherConfig.getString(PUSHER_CLASS);
+
+ Pusher<T> pusher;
+ try {
+ pusher = (Pusher) ConstructorUtils.invokeConstructor(Class.forName(pusherClass), pusherConfig);
+ } catch (ReflectiveOperationException e) {
+ log.warn("Unable to construct a pusher with class {}. LoggingPusher will be used", pusherClass, e);
+ pusher = new LoggingPusher<>();
+ }
+ return new ResourceInstance<>(pusher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java
new file mode 100644
index 0000000..ad48a1e
--- /dev/null
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/metrics/kafka/PusherFactoryTest.java
@@ -0,0 +1,102 @@
+package org.apache.gobblin.metrics.kafka;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.StringNameSharedResourceKey;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.NotConfiguredException;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+
+
+/**
+ * Test {@link PusherFactory}s
+ */
+public class PusherFactoryTest {
+
+ @Test
+ private void testCreateGobblinScopedDefaultPusher()
+ throws NotConfiguredException {
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
+ .createDefaultTopLevelBroker(ConfigFactory.empty(), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker.newSubscopedBuilder(
+ new JobScopeInstance("PusherFactoryTest", String.valueOf(System.currentTimeMillis()))).build();
+
+ StringNameSharedResourceKey key = new StringNameSharedResourceKey("test");
+
+ Pusher<Object> pusher = jobBroker.getSharedResource(new GobblinScopePusherFactory<>(), key);
+ Assert.assertEquals(pusher.getClass(), LoggingPusher.class);
+
+ try {
+ jobBroker.close();
+ instanceBroker.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ private void testCreateGobblinScopedCustomPusher()
+ throws NotConfiguredException {
+ Map<String, String> configAsMap = new HashMap<>();
+ configAsMap.put("gobblin.broker.pusher.class", TestPusher.class.getName());
+ configAsMap.put("gobblin.broker.pusher.id", "sharedId");
+ configAsMap.put("gobblin.broker.pusher.testPusher.id", "testPusherId");
+ configAsMap.put("gobblin.broker.pusher.testPusher.name", "testPusherName");
+
+ SharedResourcesBroker<GobblinScopeTypes> instanceBroker = SharedResourcesBrokerFactory
+ .createDefaultTopLevelBroker(ConfigFactory.parseMap(configAsMap), GobblinScopeTypes.GLOBAL.defaultScopeInstance());
+ SharedResourcesBroker<GobblinScopeTypes> jobBroker = instanceBroker.newSubscopedBuilder(
+ new JobScopeInstance("PusherFactoryTest", String.valueOf(System.currentTimeMillis()))).build();
+
+ StringNameSharedResourceKey key = new StringNameSharedResourceKey("testPusher");
+
+ Pusher<String> pusher = jobBroker.getSharedResource(new GobblinScopePusherFactory<>(), key);
+
+ Assert.assertEquals(pusher.getClass(), TestPusher.class);
+ TestPusher testPusher = (TestPusher) pusher;
+ Assert.assertTrue(!testPusher.isClosed);
+ Assert.assertEquals(testPusher.id, "testPusherId");
+ Assert.assertEquals(testPusher.name, "testPusherName");
+
+ try {
+ jobBroker.close();
+ instanceBroker.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ Assert.assertTrue(testPusher.isClosed);
+ }
+
+ public static class TestPusher implements Pusher<String> {
+ private boolean isClosed = false;
+ private final String id;
+ private final String name;
+
+ public TestPusher(Config config) {
+ id = config.getString("id");
+ name = config.getString("name");
+ }
+
+ @Override
+ public void pushMessages(List<String> messages) {
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ isClosed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
index 06d705a..bdb85ac 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/broker/SharedResourcesBrokerFactory.java
@@ -21,6 +21,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -54,6 +56,10 @@ public class SharedResourcesBrokerFactory {
public static final String BROKER_CONF_FILE_KEY = BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX + ".configuration";
public static final String DEFAULT_BROKER_CONF_FILE = "gobblinBroker.conf";
+ private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();
+ private static final Config BROKER_NAMESPACES_FALLBACK = ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(BrokerConstants.GOBBLIN_BROKER_CONFIG_NAMESPACES, "").build());
+
/**
* Create a root {@link SharedResourcesBroker}. Subscoped brokers should be built using
* {@link SharedResourcesBroker#newSubscopedBuilder(ScopeInstance)}.
@@ -78,7 +84,7 @@ public class SharedResourcesBrokerFactory {
return new SharedResourcesBrokerImpl<>(new DefaultBrokerCache<S>(),
scopeWrapper,
Lists.newArrayList(new SharedResourcesBrokerImpl.ScopedConfig<>(globalScope.getType(),
- ConfigUtils.getConfigOrEmpty(addSystemConfigurationToConfig(config), BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX))),
+ getBrokerConfig(addSystemConfigurationToConfig(config)))),
ImmutableMap.of(globalScope.getType(), scopeWrapper));
}
@@ -86,6 +92,24 @@ public class SharedResourcesBrokerFactory {
private static SharedResourcesBroker<SimpleScopeType> SINGLETON;
/**
+ * Get all broker configurations from the given {@code srcConfig}. Configurations from
+ * {@value BrokerConstants#GOBBLIN_BROKER_CONFIG_PREFIX} is always loaded first, then in-order from namespaces,
+ * which is encoded as a comma separated string keyed by {@value BrokerConstants#GOBBLIN_BROKER_CONFIG_NAMESPACES}.
+ */
+ @VisibleForTesting
+ static Config getBrokerConfig(Config srcConfig) {
+ Config allSrcConfig = srcConfig.withFallback(BROKER_NAMESPACES_FALLBACK);
+ String namespaces = allSrcConfig.getString(BrokerConstants.GOBBLIN_BROKER_CONFIG_NAMESPACES);
+ Config brokerConfig = ConfigUtils.getConfigOrEmpty(allSrcConfig, BrokerConstants.GOBBLIN_BROKER_CONFIG_PREFIX);
+
+ for (String namespace : LIST_SPLITTER.splitToList(namespaces)) {
+ brokerConfig = brokerConfig.withFallback(ConfigUtils.getConfigOrEmpty(allSrcConfig, namespace));
+ }
+
+ return brokerConfig;
+ }
+
+ /**
* Get the implicit {@link SharedResourcesBroker} in the callers thread. This is either a singleton broker configured
* from environment variables, java options, and classpath configuration options, or a specific broker injected
* elsewhere in the application.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/261819c6/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
index 1b1c3e8..2df48a9 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/broker/SharedResourcesBrokerFactoryTest.java
@@ -17,6 +17,8 @@
package org.apache.gobblin.broker;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -67,6 +69,35 @@ public class SharedResourcesBrokerFactoryTest {
Assert.assertEquals(configView.getConfig().getString("testKey"), "testValue");
}
+ @Test
+ public void testGetBrokerConfig() {
+ Map<String, String> srcConfigMap = new HashMap<>();
+ srcConfigMap.put("gobblin.broker.key1", "value1");
+
+ // Test global namespace, "gobblin.broker"
+ Config brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap));
+ Config expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1"));
+ Assert.assertEquals(brokerConfig, expectedConfig);
+
+ // Test extra namespace, "gobblin.shared"
+ srcConfigMap.put("gobblin.shared.key2", "value2");
+ srcConfigMap.put("gobblin.brokerNamespaces", "gobblin.shared");
+ brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap));
+ expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1","key2", "value2"));
+ Assert.assertEquals(brokerConfig, expectedConfig);
+
+ // Test a list of extra namespaces, configurations are respected in order
+ srcConfigMap.put("gobblin.shared.key2", "value2");
+ srcConfigMap.put("gobblin.shared.key3", "value3");
+ srcConfigMap.put("gobblin.shared2.key3", "value3x");
+ srcConfigMap.put("gobblin.shared2.key4", "value4");
+ srcConfigMap.put("gobblin.brokerNamespaces", "gobblin.shared, gobblin.shared2");
+ brokerConfig = SharedResourcesBrokerFactory.getBrokerConfig(ConfigFactory.parseMap(srcConfigMap));
+ expectedConfig = ConfigFactory.parseMap(ImmutableMap.of("key1", "value1", "key2", "value2",
+ "key3", "value3", "key4", "value4"));
+ Assert.assertEquals(brokerConfig, expectedConfig);
+ }
+
public static class ImplicitBrokerTest implements Runnable {
@Override