You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/08 17:31:16 UTC
[gobblin] branch master updated: [GOBBLIN-1444] Use Guice as DI
framework in Gobblin service
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bf37c76 [GOBBLIN-1444] Use Guice as DI framework in Gobblin service
bf37c76 is described below
commit bf37c76f6da228ae96b0c405d184617f03d2c198
Author: aprokofiev <ap...@linkedin.com>
AuthorDate: Tue Jun 8 10:31:08 2021 -0700
[GOBBLIN-1444] Use Guice as DI framework in Gobblin service
Previously, to initialize Gobblin service, we used
a mixture
of dependency injection, direct class creation and
config-based
class creation. In this change, we unify the
service initialization
by moving towards using dependency injection(DI)
with Guice everywhere.
Using DI will help with (1) unit testing; (2)
overriding classes in the
middle of the dependency with company-specific
implementations, and
(3) will improve code readability, as dependencies
between classes
become visible from the outside and explicit.
We also move away from name-based injection for
classes. Name-based
injection is useful when code needs several
different implementations
of the same interface. In our use case, we had
only one implementation
for each service that can be active. Name-based
injection was used for
company-specific overrides, but there is a better
way to do it - with
Guice module overrides.
There are still several improvements left to be
done to have a full
Guice migration, but we'll make them in separate
commits to limit the
PR size.
Closes #3281 from aplex/guice-migration
---
.../apache/gobblin/util/ClassAliasResolver.java | 16 +-
.../org/apache/gobblin/service/FlowConfigTest.java | 6 +-
.../apache/gobblin/service/FlowConfigV2Test.java | 6 +-
.../org/apache/gobblin/service/FlowStatusTest.java | 6 +-
.../service/FlowConfigResourceLocalHandler.java | 2 +
.../service/FlowConfigV2ResourceLocalHandler.java | 6 +-
.../gobblin/service/FlowConfigsResource.java | 10 +-
.../gobblin/service/FlowConfigsV2Resource.java | 8 +-
...vice.java => FlowConfigsV2ResourceHandler.java} | 16 +-
.../gobblin/service/FlowExecutionResource.java | 3 +-
.../service/FlowExecutionResourceLocalHandler.java | 2 +
.../apache/gobblin/service/FlowStatusResource.java | 9 +-
.../gobblin/service/LdapGroupOwnershipService.java | 13 +-
.../service/LocalGroupOwnershipService.java | 14 +-
.../gobblin/service/NoopGroupOwnershipService.java | 9 +-
.../gobblin/service/NoopRequesterService.java | 3 +
.../gobblin/restli/EmbeddedRestliServer.java | 2 +
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 5 +
.../runtime/spec_catalog/TopologyCatalog.java | 4 +
.../apache/gobblin/scheduler/SchedulerService.java | 8 +-
.../service/monitoring/FlowStatusGenerator.java | 8 +-
.../monitoring/FlowStatusGeneratorTest.java | 2 +-
gobblin-service/build.gradle | 7 +-
...ControllerUserDefinedMessageHandlerFactory.java | 8 +-
.../service/modules/core/GitConfigMonitor.java | 4 +
.../modules/core/GobblinServiceConfiguration.java | 115 ++++++
.../modules/core/GobblinServiceGuiceModule.java | 253 ++++++++++++
.../modules/core/GobblinServiceManager.java | 440 ++++++++-------------
.../service/modules/orchestration/DagManager.java | 27 +-
.../modules/orchestration/Orchestrator.java | 48 +--
.../GobblinServiceFlowConfigResourceHandler.java | 9 +-
.../GobblinServiceFlowConfigV2ResourceHandler.java | 43 ++
...GobblinServiceFlowExecutionResourceHandler.java | 13 +-
.../scheduler/GobblinServiceJobScheduler.java | 20 +-
.../topology/ConfigBasedTopologySpecFactory.java | 7 +-
.../service/modules/utils/InjectionNames.java | 23 +-
.../service/monitoring/FsJobStatusRetriever.java | 5 +-
.../monitoring/KafkaJobStatusMonitorFactory.java | 22 +-
.../gobblin/service/GobblinServiceManagerTest.java | 32 +-
.../service/modules/core/GobblinServiceHATest.java | 41 +-
.../modules/core/GobblinServiceRedirectTest.java | 9 +-
.../modules/orchestration/DagManagerFlowTest.java | 10 +-
.../modules/orchestration/OrchestratorTest.java | 11 +-
gradle/scripts/dependencyDefinitions.gradle | 1 +
44 files changed, 800 insertions(+), 506 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
index a255795..97708ef 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.util;
import java.util.List;
import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-
import org.reflections.Reflections;
import org.reflections.util.ConfigurationBuilder;
@@ -29,6 +27,9 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.annotation.Alias;
@@ -49,16 +50,16 @@ import org.apache.gobblin.annotation.Alias;
* </b>
*/
@Slf4j
+@ThreadSafe
public class ClassAliasResolver<T> {
// Scan all packages in the classpath with prefix gobblin, com.linkedin.gobblin when class is loaded.
// Since scan is expensive we do it only once when class is loaded.
- private static final Reflections REFLECTIONS = new Reflections(new ConfigurationBuilder().forPackages("gobblin",
- "com.linkedin.gobblin", "org.apache.gobblin"));
-
- Map<String, Class<? extends T>> aliasToClassCache;
+ private static final Reflections REFLECTIONS =
+ new Reflections(new ConfigurationBuilder().forPackages("gobblin", "com.linkedin.gobblin", "org.apache.gobblin"));
private final List<Alias> aliasObjects;
private final Class<T> subtypeOf;
+ ImmutableMap<String, Class<? extends T>> aliasToClassCache;
public ClassAliasResolver(Class<T> subTypeOf) {
Map<String, Class<? extends T>> cache = Maps.newHashMap();
@@ -108,7 +109,8 @@ public class ClassAliasResolver<T> {
return Class.forName(aliasOrClassName).asSubclass(this.subtypeOf);
} catch (ClassCastException cce) {
throw new ClassNotFoundException(
- String.format("Found class %s but it cannot be cast to %s.", aliasOrClassName, this.subtypeOf.getName()), cce);
+ String.format("Found class %s but it cannot be cast to %s.", aliasOrClassName, this.subtypeOf.getName()),
+ cce);
}
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 8876725..c2eee20 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -18,8 +18,6 @@
package org.apache.gobblin.service;
import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Map;
@@ -94,14 +92,12 @@ public class FlowConfigTest {
@Override
public void configure(Binder binder) {
binder.bind(FlowConfigsResourceHandler.class)
- .annotatedWith(Names.named(FlowConfigsResource.INJECT_FLOW_CONFIG_RESOURCE_HANDLER))
.toInstance(new FlowConfigResourceLocalHandler(flowCatalog));
// indicate that we are in unit testing since the resource is being blocked until flow catalog changes have
// been made
binder.bindConstant().annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
- binder.bind(RequesterService.class)
- .annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE)).toInstance(new NoopRequesterService(config));
+ binder.bind(RequesterService.class).toInstance(new NoopRequesterService(config));
}
});
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 0710a3e..aa067c0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -119,12 +119,12 @@ public class FlowConfigV2Test {
Injector injector = Guice.createInjector(new Module() {
@Override
public void configure(Binder binder) {
- binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog));
+ binder.bind(FlowConfigsV2ResourceHandler.class).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog));
// indicate that we are in unit testing since the resource is being blocked until flow catalog changes have
// been made
binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
- binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
- binder.bind(GroupOwnershipService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE)).toInstance(groupOwnershipService);
+ binder.bind(RequesterService.class).toInstance(_requesterService);
+ binder.bind(GroupOwnershipService.class).toInstance(groupOwnershipService);
}
});
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index e133582..16e08fc 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -34,7 +34,6 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
-import com.google.inject.name.Names;
import com.linkedin.restli.server.resources.BaseResource;
import org.apache.gobblin.configuration.State;
@@ -83,13 +82,12 @@ public class FlowStatusTest {
@BeforeClass
public void setUp() throws Exception {
JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever();
- final FlowStatusGenerator flowStatusGenerator =
- FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+ final FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Injector injector = Guice.createInjector(new Module() {
@Override
public void configure(Binder binder) {
- binder.bind(FlowStatusGenerator.class).annotatedWith(Names.named(FlowStatusResource.FLOW_STATUS_GENERATOR_INJECT_NAME))
+ binder.bind(FlowStatusGenerator.class)
.toInstance(flowStatusGenerator);
}
});
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 7244afb..8430acf 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -36,6 +36,7 @@ import com.linkedin.restli.server.UpdateResponse;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.inject.Inject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -64,6 +65,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
protected final ContextAwareMeter deleteFlow;
protected final ContextAwareMeter runImmediatelyFlow;
+ @Inject
public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
this.flowCatalog = flowCatalog;
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 86fead8..289ab89 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.gobblin.service;
+
import java.util.Map;
import org.apache.commons.lang3.StringEscapeUtils;
@@ -27,6 +28,7 @@ import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
+import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -34,11 +36,13 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@Slf4j
-public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsV2ResourceHandler {
+ @Inject
public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
super(flowCatalog);
}
+
@Override
/**
* Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index 9ee00c0..95c0d37 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -27,9 +27,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSet;
-import javax.inject.Inject;
-import javax.inject.Named;
-
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
@@ -38,6 +35,9 @@ import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+import javax.inject.Inject;
+import javax.inject.Named;
+
/**
* Resource for handling flow configuration requests
*/
@@ -45,19 +45,15 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> {
private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class);
- public static final String INJECT_FLOW_CONFIG_RESOURCE_HANDLER = "flowConfigsResourceHandler";
- public static final String INJECT_REQUESTER_SERVICE = "requesterService";
public static final String INJECT_READY_TO_USE = "readToUse";
private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
@Inject
- @Named(INJECT_FLOW_CONFIG_RESOURCE_HANDLER)
private FlowConfigsResourceHandler flowConfigsResourceHandler;
// For getting who sends the request
@Inject
- @Named(INJECT_REQUESTER_SERVICE)
private RequesterService requesterService;
// For blocking use of this resource until it is ready
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 1ea39bc..00b49f7 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -58,10 +58,7 @@ import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
@RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id")
public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> {
private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class);
- public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = "flowConfigsV2ResourceHandler";
- public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
- public static final String INJECT_GROUP_OWNERSHIP_SERVICE = "v2GroupOwnershipService";
private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
@@ -69,12 +66,10 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null;
@Inject
- @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME)
- private FlowConfigsResourceHandler flowConfigsResourceHandler;
+ private FlowConfigsV2ResourceHandler flowConfigsResourceHandler;
// For getting who sends the request
@Inject
- @Named(INJECT_REQUESTER_SERVICE)
private RequesterService requesterService;
// For blocking use of this resource until it is ready
@@ -83,7 +78,6 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
private Boolean readyToUse;
@Inject
- @Named(INJECT_GROUP_OWNERSHIP_SERVICE)
private GroupOwnershipService groupOwnershipService;
public FlowConfigsV2Resource() {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandler.java
similarity index 69%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandler.java
index 639c6a1..7528cc8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandler.java
@@ -14,20 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.service;
-
-import com.typesafe.config.Config;
-import java.util.List;
-import org.apache.gobblin.annotation.Alias;
-
-@Alias("noop")
-public class NoopGroupOwnershipService extends GroupOwnershipService{
-
- public NoopGroupOwnershipService(Config config) {
- }
+package org.apache.gobblin.service;
- public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, String group) {
- return true;
- }
+public interface FlowConfigsV2ResourceHandler extends FlowConfigsResourceHandler {
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 9b6c7cf..5adec70 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -41,9 +41,8 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
*/
@RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
- public static final String FLOW_EXECUTION_GENERATOR_INJECT_NAME = "FlowExecutionResourceHandler";
- @Inject @javax.inject.Inject @javax.inject.Named(FLOW_EXECUTION_GENERATOR_INJECT_NAME)
+ @Inject
FlowExecutionResourceHandler flowExecutionResourceHandler;
public FlowExecutionResource() {}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index b6d30b8..f61fc96 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -30,6 +30,7 @@ import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.RestLiServiceException;
import com.linkedin.restli.server.UpdateResponse;
+import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.monitoring.FlowStatus;
@@ -42,6 +43,7 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
private final FlowStatusGenerator flowStatusGenerator;
+ @Inject
public FlowExecutionResourceLocalHandler(FlowStatusGenerator flowStatusGenerator) {
this.flowStatusGenerator = flowStatusGenerator;
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 062af97..48ad8e8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -20,10 +20,6 @@ package org.apache.gobblin.service;
import java.util.List;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.server.PagingContext;
@@ -34,6 +30,8 @@ import com.linkedin.restli.server.annotations.QueryParam;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
+import javax.inject.Inject;
+
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -42,10 +40,9 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
*/
@RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
- public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
public static final String MESSAGE_SEPARATOR = ", ";
- @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
+ @Inject
FlowStatusGenerator _flowStatusGenerator;
public FlowStatusResource() {}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java
index f97d12e..684b385 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java
@@ -16,25 +16,32 @@
*/
package org.apache.gobblin.service;
-
-import com.typesafe.config.Config;
import java.util.List;
import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
import javax.naming.NamingException;
import javax.naming.PartialResultException;
+
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.util.LdapUtils;
-import org.apache.log4j.Logger;
/**
* Queries external Active Directory service to check if the requester is part of the group
*/
@Alias("ldap")
+@Singleton
public class LdapGroupOwnershipService extends GroupOwnershipService {
LdapUtils ldapUtils;
private static final Logger logger = Logger.getLogger(LdapGroupOwnershipService.class);
+ @Inject
public LdapGroupOwnershipService(Config config) {
this.ldapUtils = new LdapUtils(config);
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
index abe3bf3..7b07142 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
@@ -16,14 +16,20 @@
*/
package org.apache.gobblin.service;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
import com.google.common.base.Splitter;
import com.google.gson.JsonObject;
import com.typesafe.config.Config;
-import java.io.IOException;
-import java.util.List;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
import org.apache.gobblin.annotation.Alias;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
-import org.apache.hadoop.fs.Path;
/**
@@ -31,11 +37,13 @@ import org.apache.hadoop.fs.Path;
* and values denote a list of group members
*/
@Alias("local")
+@Singleton
public class LocalGroupOwnershipService extends GroupOwnershipService {
public static final String GROUP_MEMBER_LIST = "groupOwnershipService.groupMembers.path";
LocalGroupOwnershipPathAlterationListener listener;
PathAlterationObserver observer;
+ @Inject
public LocalGroupOwnershipService(Config config) {
Path groupOwnershipFilePath = new Path(config.getString(GROUP_MEMBER_LIST));
try {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
index 639c6a1..c76a085 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
@@ -16,14 +16,21 @@
*/
package org.apache.gobblin.service;
-import com.typesafe.config.Config;
import java.util.List;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
import org.apache.gobblin.annotation.Alias;
@Alias("noop")
+@Singleton
public class NoopGroupOwnershipService extends GroupOwnershipService{
+ @Inject
public NoopGroupOwnershipService(Config config) {
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java
index 1b7c082..c38d697 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java
@@ -23,12 +23,15 @@ import com.google.common.collect.Lists;
import com.linkedin.restli.server.resources.BaseResource;
import com.typesafe.config.Config;
+import javax.inject.Inject;
+
/**
* Default requester service which does not track any requester information.
*/
public class NoopRequesterService extends RequesterService {
+ @Inject
public NoopRequesterService(Config config) {
super(config);
}
diff --git a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java
index 8d3452e..3919064 100644
--- a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java
+++ b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java
@@ -34,6 +34,7 @@ import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
+import com.google.inject.Singleton;
import com.linkedin.r2.filter.FilterChain;
import com.linkedin.r2.filter.FilterChains;
import com.linkedin.r2.filter.compression.EncodingType;
@@ -69,6 +70,7 @@ import lombok.Setter;
* * name - defaults to the name of the first resource in the resource collection.
* * injector - an {@link Injector} to inject dependencies into the Rest.li resources.
*/
+@Singleton
public class EmbeddedRestliServer extends AbstractIdleService {
private static final int MAX_PORT = 65535;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 147a86b..0b6e7a4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -34,11 +34,14 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigException;
import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.Getter;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
@@ -66,6 +69,7 @@ import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
* A service that interact with FlowSpec storage.
* The FlowSpec storage, a.k.a. {@link SpecStore} should be plugable with different implementation.
*/
+@Singleton
public class FlowCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog {
/***
@@ -97,6 +101,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
this(config, log, Optional.<MetricContext>absent(), true);
}
+ @Inject
public FlowCatalog(Config config, GobblinInstanceEnvironment env) {
this(config, Optional.of(env.getLog()), Optional.of(env.getMetricContext()),
env.isInstrumentationEnabled());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 5d86768..6464d9f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -39,6 +39,8 @@ import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.Getter;
import org.apache.gobblin.annotation.Alpha;
@@ -65,6 +67,7 @@ import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
@Alpha
+@Singleton
public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog {
public static final String DEFAULT_TOPOLOGYSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
@@ -88,6 +91,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
this(config, log, Optional.<MetricContext>absent(), true);
}
+ @Inject
public TopologyCatalog(Config config, GobblinInstanceEnvironment env) {
this(config, Optional.of(env.getLog()), Optional.of(env.getMetricContext()),
env.isInstrumentationEnabled());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
index 2d0136f..285eb44 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
@@ -27,17 +27,20 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Getter;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
-import lombok.Getter;
-
/**
* A {@link com.google.common.util.concurrent.Service} wrapping a Quartz {@link Scheduler} allowing correct shutdown
* of the scheduler when {@link JobScheduler} fails to initialize.
*/
+@Singleton
public class SchedulerService extends AbstractIdleService {
@Getter
@@ -57,6 +60,7 @@ public class SchedulerService extends AbstractIdleService {
Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props, Optional.of("org.quartz."))));
}
+ @Inject
public SchedulerService(Config cfg) {
this(cfg.hasPath(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) ?
cfg.getBoolean(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) :
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 7546e12..909e856 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import lombok.Builder;
+import javax.inject.Inject;
import org.apache.gobblin.annotation.Alpha;
@@ -34,13 +34,17 @@ import org.apache.gobblin.annotation.Alpha;
* Generator for {@link FlowStatus}, which relies on a {@link JobStatusRetriever}.
*/
@Alpha
-@Builder
public class FlowStatusGenerator {
public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
public static final int MAX_LOOKBACK = 100;
private final JobStatusRetriever jobStatusRetriever;
+ @Inject
+ public FlowStatusGenerator(JobStatusRetriever jobStatusRetriever) {
+ this.jobStatusRetriever = jobStatusRetriever;
+ }
+
/**
* Get the flow statuses of last <code>count</code> (or fewer) executions
* @param flowName
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 4515e51..01a75be 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -34,7 +34,7 @@ public class FlowStatusGeneratorTest {
String flowGroup = "testGroup";
Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
- FlowStatusGenerator flowStatusGenerator = FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+ FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
//If a flow is COMPILED, isFlowRunning() should return true.
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index d83cc57..c32885b 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -48,6 +48,7 @@ dependencies {
compile externalDependency.guava
compile externalDependency.guavaretrying
compile externalDependency.guice
+ compile externalDependency.guiceMultibindings
compile externalDependency.hadoopClientCommon
compile externalDependency.hadoopCommon
compile externalDependency.helix
@@ -128,12 +129,6 @@ configurations {
compile {
transitive = false
}
-
- testRuntime {
- resolutionStrategy {
- force 'com.google.inject:guice:3.0'
- }
- }
}
artifacts {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index 08a9cb5..41e7a33 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -37,10 +37,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsResourceHandler;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
-import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
/**
@@ -51,7 +51,7 @@ import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
private boolean flowCatalogLocalCommit;
private GobblinServiceJobScheduler jobScheduler;
- private GobblinServiceFlowConfigResourceHandler resourceHandler;
+ private FlowConfigsResourceHandler resourceHandler;
private String serviceName;
@Override
@@ -80,12 +80,12 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
private static class ControllerUserDefinedMessageHandler extends MessageHandler {
private boolean flowCatalogLocalCommit;
private GobblinServiceJobScheduler jobScheduler;
- private GobblinServiceFlowConfigResourceHandler resourceHandler;
+ private FlowConfigsResourceHandler resourceHandler;
private String serviceName;
public ControllerUserDefinedMessageHandler(Message message, NotificationContext context, String serviceName,
boolean flowCatalogLocalCommit, GobblinServiceJobScheduler scheduler,
- GobblinServiceFlowConfigResourceHandler resourceHandler) {
+ FlowConfigsResourceHandler resourceHandler) {
super(message, context);
this.serviceName = serviceName;
this.flowCatalogLocalCommit = flowCatalogLocalCommit;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 7bbd4f6..5aa0b43 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -27,6 +27,8 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
@@ -45,6 +47,7 @@ import org.apache.gobblin.util.PullFileLoader;
* The <flowGroup> and <flowName> is used to generate the URI used to store the config in the {@link FlowCatalog}
*/
@Slf4j
+@Singleton
public class GitConfigMonitor extends GitMonitoringService {
public static final String GIT_CONFIG_MONITOR_PREFIX = "gobblin.service.gitConfigMonitor";
@@ -72,6 +75,7 @@ public class GitConfigMonitor extends GitMonitoringService {
private final FlowCatalog flowCatalog;
private final Config emptyConfig = ConfigFactory.empty();
+ @Inject
GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
this.flowCatalog = flowCatalog;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
new file mode 100644
index 0000000..0326dd7
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.util.Objects;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@ToString
+public class GobblinServiceConfiguration {
+
+ @Getter
+ private final String serviceName;
+
+ @Getter
+ private final String serviceId;
+
+ @Getter
+ private final boolean isTopologyCatalogEnabled;
+
+ @Getter
+ private final boolean isFlowCatalogEnabled;
+
+ @Getter
+ private final boolean isSchedulerEnabled;
+
+ @Getter
+ private final boolean isRestLIServerEnabled;
+
+ @Getter
+ private final boolean isTopologySpecFactoryEnabled;
+
+ @Getter
+ private final boolean isGitConfigMonitorEnabled;
+
+ @Getter
+ private final boolean isDagManagerEnabled;
+
+ @Getter
+ private final boolean isJobStatusMonitorEnabled;
+
+ @Getter
+ private final boolean isHelixManagerEnabled;
+
+ @Getter
+ private final boolean flowCatalogLocalCommit;
+
+ @Getter
+ private final Config innerConfig;
+
+ @Getter
+ @Nullable
+ private final Path serviceWorkDir;
+
+ public GobblinServiceConfiguration(String serviceName, String serviceId, Config config,
+ @Nullable Path serviceWorkDir) {
+ this.serviceName = Objects.requireNonNull(serviceName,"Service name cannot be null");
+ this.serviceId = Objects.requireNonNull(serviceId,"Service id cannot be null");
+ this.innerConfig = Objects.requireNonNull(config, "Config cannot be null");
+ this.serviceWorkDir = serviceWorkDir;
+
+ isTopologyCatalogEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
+ isFlowCatalogEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
+
+ if (isFlowCatalogEnabled) {
+ flowCatalogLocalCommit =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT,
+ ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT);
+ isGitConfigMonitorEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
+ } else {
+ flowCatalogLocalCommit = false;
+ isGitConfigMonitorEnabled = false;
+ }
+
+ this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
+ this.isDagManagerEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
+ this.isJobStatusMonitorEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
+ this.isSchedulerEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
+ this.isRestLIServerEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
+ this.isTopologySpecFactoryEnabled =
+ ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
new file mode 100644
index 0000000..a4d1458
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.util.Objects;
+
+import org.apache.helix.HelixManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provider;
+import com.google.inject.multibindings.OptionalBinder;
+import com.google.inject.name.Names;
+import com.typesafe.config.Config;
+
+import javax.inject.Singleton;
+
+import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsResource;
+import org.apache.gobblin.service.FlowConfigsResourceHandler;
+import org.apache.gobblin.service.FlowConfigsV2Resource;
+import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResource;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
+import org.apache.gobblin.service.FlowStatusResource;
+import org.apache.gobblin.service.GroupOwnershipService;
+import org.apache.gobblin.service.NoopRequesterService;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class GobblinServiceGuiceModule implements Module {
+ private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceGuiceModule.class);
+ private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
+
+ GobblinServiceConfiguration serviceConfig;
+
+ public GobblinServiceGuiceModule(GobblinServiceConfiguration serviceConfig) {
+ this.serviceConfig = Objects.requireNonNull(serviceConfig);
+ }
+
+ @Override
+ public void configure(Binder binder) {
+ LOGGER.info("Configuring bindings for the following service settings: {}", serviceConfig);
+
+ // In the current code base, we frequently inject classes instead of interfaces
+ // As a result, even when the binding is missing, Guice will create an instance of the
+ // the class and inject it. This interferes with disabling of different services and
+ // components, because without explicit bindings they will get instantiated anyway.
+ binder.requireExplicitBindings();
+
+ // Optional binder will find the existing binding for T and create additional binding for Optional<T>.
+ // If none of the specific class binding exist, optional will be "absent".
+ OptionalBinder.newOptionalBinder(binder, Logger.class);
+
+ binder.bind(Logger.class).toInstance(LoggerFactory.getLogger(GobblinServiceManager.class));
+
+ binder.bind(Config.class).toInstance(serviceConfig.getInnerConfig());
+
+ binder.bind(GobblinServiceConfiguration.class).toInstance(serviceConfig);
+
+ // Used by TopologyCatalog and FlowCatalog
+ GobblinInstanceEnvironment gobblinInstanceEnvironment = StandardGobblinInstanceLauncher.builder()
+ .withLog(LoggerFactory.getLogger(GobblinServiceManager.class))
+ .setInstrumentationEnabled(true)
+ .withSysConfig(serviceConfig.getInnerConfig())
+ .build();
+
+ binder.bind(GobblinInstanceEnvironment.class).toInstance(gobblinInstanceEnvironment);
+
+ binder.bind(EventBus.class)
+ .annotatedWith(Names.named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME))
+ .toInstance(new EventBus(GobblinServiceManager.class.getSimpleName()));
+
+ binder.bindConstant().annotatedWith(Names.named(InjectionNames.SERVICE_NAME)).to(serviceConfig.getServiceName());
+
+ binder.bindConstant()
+ .annotatedWith(Names.named(InjectionNames.FORCE_LEADER))
+ .to(ConfigUtils.getBoolean(serviceConfig.getInnerConfig(), ServiceConfigKeys.FORCE_LEADER,
+ ServiceConfigKeys.DEFAULT_FORCE_LEADER));
+
+ binder.bindConstant()
+ .annotatedWith(Names.named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT))
+ .to(serviceConfig.getServiceName());
+
+ binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
+ binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
+ binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
+
+
+ binder.bind(FlowConfigsResource.class);
+ binder.bind(FlowConfigsV2Resource.class);
+ binder.bind(FlowStatusResource.class);
+ binder.bind(FlowExecutionResource.class);
+
+ binder.bind(FlowConfigResourceLocalHandler.class);
+ binder.bind(FlowConfigV2ResourceLocalHandler.class);
+ binder.bind(FlowExecutionResourceLocalHandler.class);
+
+ binder.bindConstant().annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
+ binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
+ binder.bind(RequesterService.class)
+ .to(NoopRequesterService.class);
+
+ OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
+ if (serviceConfig.isTopologyCatalogEnabled()) {
+ binder.bind(TopologyCatalog.class);
+ }
+
+ if (serviceConfig.isTopologySpecFactoryEnabled()) {
+ binder.bind(TopologySpecFactory.class)
+ .to(getClassByNameOrAlias(TopologySpecFactory.class, serviceConfig.getInnerConfig(),
+ ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
+ }
+
+ OptionalBinder.newOptionalBinder(binder, DagManager.class);
+ if (serviceConfig.isDagManagerEnabled()) {
+ binder.bind(DagManager.class);
+ }
+
+ OptionalBinder.newOptionalBinder(binder, HelixManager.class);
+ if (serviceConfig.isHelixManagerEnabled()) {
+ binder.bind(HelixManager.class)
+ .toInstance(buildHelixManager(serviceConfig.getInnerConfig(),
+ serviceConfig.getInnerConfig().getString(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY)));
+ } else {
+ LOGGER.info("No ZooKeeper connection string. Running in single instance mode.");
+ }
+
+ OptionalBinder.newOptionalBinder(binder, FlowCatalog.class);
+ if (serviceConfig.isFlowCatalogEnabled()) {
+ binder.bind(FlowCatalog.class);
+ }
+
+ if (serviceConfig.isJobStatusMonitorEnabled()) {
+ binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+ }
+
+ binder.bind(FlowStatusGenerator.class);
+
+ if (serviceConfig.isSchedulerEnabled()) {
+ binder.bind(Orchestrator.class);
+ binder.bind(SchedulerService.class);
+ binder.bind(GobblinServiceJobScheduler.class);
+ }
+
+ if (serviceConfig.isGitConfigMonitorEnabled()) {
+ binder.bind(GitConfigMonitor.class);
+ }
+
+ binder.bind(GroupOwnershipService.class)
+ .to(getClassByNameOrAlias(GroupOwnershipService.class, serviceConfig.getInnerConfig(),
+ ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS, ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE));
+
+ binder.bind(JobStatusRetriever.class)
+ .to(getClassByNameOrAlias(JobStatusRetriever.class, serviceConfig.getInnerConfig(),
+ JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
+
+ if (serviceConfig.isRestLIServerEnabled()) {
+ binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class).in(Singleton.class);
+ }
+
+ binder.bind(GobblinServiceManager.class);
+
+ LOGGER.info("Bindings configured");
+ }
+
+ protected HelixManager buildHelixManager(Config config, String zkConnectionString) {
+ String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
+ String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
+
+ LOGGER.info(
+ "Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
+ HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
+
+ return HelixUtils.buildHelixManager(helixInstanceName, helixClusterName, zkConnectionString);
+ }
+
+ protected static <T> Class<? extends T> getClassByNameOrAlias(Class<T> baseClass, Config config,
+ String classPropertyName, String defaultClass) {
+ String className = ConfigUtils.getString(config, classPropertyName, defaultClass);
+ ClassAliasResolver<T> aliasResolver = new ClassAliasResolver<T>(baseClass);
+ try {
+ return (Class<? extends T>) Class.forName(aliasResolver.resolve(className));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(
+ "Cannot resolve the class '" + className + "'. Check that property '" + classPropertyName
+ + "' points to a valid class name or alias.", e);
+ }
+ }
+
+ public static class EmbeddedRestliServerProvider implements Provider<EmbeddedRestliServer> {
+ Injector injector;
+
+ @Inject
+ public EmbeddedRestliServerProvider(Injector injector) {
+ this.injector = injector;
+ }
+
+ @Override
+ public EmbeddedRestliServer get() {
+ return EmbeddedRestliServer.builder()
+ .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
+ .injector(injector)
+ .build();
+ }
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a9518d1..9818ef5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -18,10 +18,10 @@
package org.apache.gobblin.service.modules.core;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -30,9 +30,7 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.service.GroupOwnershipService;
-import org.apache.gobblin.service.NoopGroupOwnershipService;
+import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,16 +49,17 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.EventBus;
-import com.google.inject.Binder;
import com.google.inject.Guice;
+import com.google.inject.Inject;
import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.google.inject.name.Names;
+import com.google.inject.Stage;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.RemoteInvocationException;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.annotation.Nullable;
+import javax.inject.Named;
import lombok.Getter;
import lombok.Setter;
@@ -79,37 +78,25 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
-import org.apache.gobblin.service.FlowExecutionResource;
-import org.apache.gobblin.service.FlowExecutionResourceHandler;
-import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigClient;
-import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
-import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
import org.apache.gobblin.service.FlowConfigsResource;
import org.apache.gobblin.service.FlowConfigsResourceHandler;
import org.apache.gobblin.service.FlowConfigsV2Resource;
+import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.NoopRequesterService;
-import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.GroupOwnershipService;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
-import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
-import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
-import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
-import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
-import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
@Alpha
@@ -120,269 +107,144 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
public static final String SERVICE_NAME_OPTION_NAME = "service_name";
public static final String SERVICE_ID_OPTION_NAME = "service_id";
+ public static final String SERVICE_EVENT_BUS_NAME = "GobblinServiceManagerEventBus";
+
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class);
- private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
protected final ServiceBasedAppLauncher serviceLauncher;
private volatile boolean stopInProgress = false;
// An EventBus used for communications between services running in the ApplicationMaster
+ @Inject
+ @Named(SERVICE_EVENT_BUS_NAME)
@Getter
- protected EventBus eventBus = new EventBus(GobblinServiceManager.class.getSimpleName());
+ protected EventBus eventBus;
protected final FileSystem fs;
protected final Path serviceWorkDir;
- protected final String serviceName;
- protected final String serviceId;
-
- protected final boolean isTopologyCatalogEnabled;
- protected final boolean isFlowCatalogEnabled;
- protected final boolean isSchedulerEnabled;
- protected final boolean isRestLIServerEnabled;
- protected final boolean isTopologySpecFactoryEnabled;
- protected final boolean isGitConfigMonitorEnabled;
- protected boolean isDagManagerEnabled;
- protected final boolean isJobStatusMonitorEnabled;
+ @Getter
+ protected final GobblinServiceConfiguration configuration;
+
+ @Inject(optional = true)
protected TopologyCatalog topologyCatalog;
+
+ @Inject(optional = true)
@Getter
protected FlowCatalog flowCatalog;
+
+ @Inject(optional = true)
@Getter
protected GobblinServiceJobScheduler scheduler;
+
+ @Inject
@Getter
- protected GobblinServiceFlowConfigResourceHandler resourceHandler;
+ protected FlowConfigsResourceHandler resourceHandler;
+
+ @Inject
@Getter
- protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
+ protected FlowConfigsV2ResourceHandler v2ResourceHandler;
+
+ @Inject
@Getter
- protected GobblinServiceFlowExecutionResourceHandler flowExecutionResourceHandler;
+ protected FlowExecutionResourceHandler flowExecutionResourceHandler;
+
+ @Inject
@Getter
protected FlowStatusGenerator flowStatusGenerator;
+
+ @Inject
@Getter
protected GroupOwnershipService groupOwnershipService;
+ @Inject
+ @Getter
+ private Injector injector;
+
protected boolean flowCatalogLocalCommit;
+
+ @Inject(optional = true)
@Getter
protected Orchestrator orchestrator;
+
+ @Inject(optional = true)
protected EmbeddedRestliServer restliServer;
+
+ @Inject(optional = true)
protected TopologySpecFactory topologySpecFactory;
- protected Optional<HelixManager> helixManager;
+ @Inject
+ protected SchedulerService schedulerService;
- protected ClassAliasResolver<TopologySpecFactory> aliasResolver;
+ @Inject(optional = true)
+ protected Optional<HelixManager> helixManager;
+ @Inject(optional = true)
protected GitConfigMonitor gitConfigMonitor;
+ @Inject(optional = true)
@Getter
protected DagManager dagManager;
+ @Inject(optional = true)
protected KafkaJobStatusMonitor jobStatusMonitor;
protected Optional<HelixLeaderState> helixLeaderGauges;
- @Getter
- protected Config config;
private final MetricContext metricContext;
private final Metrics metrics;
- public GobblinServiceManager(String serviceName, String serviceId, Config config,
- Optional<Path> serviceWorkDirOptional) throws Exception {
+ @Inject
+ protected GobblinServiceManager(GobblinServiceConfiguration configuration) throws Exception {
+ this.configuration = Objects.requireNonNull(configuration);
+
+ Properties appLauncherProperties = ConfigUtils.configToProperties(
+ ConfigUtils.getConfigOrEmpty(configuration.getInnerConfig(), ServiceConfigKeys.GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX)
+ .withFallback(configuration.getInnerConfig()));
- Properties appLauncherProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX).withFallback(config));
// Done to preserve backwards compatibility with the previously hard-coded timeout of 5 minutes
if (!appLauncherProperties.contains(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS)) {
appLauncherProperties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, Long.toString(300));
}
- this.config = config;
- this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
- this.metrics = new Metrics(this.metricContext, config);
- this.serviceName = serviceName;
- this.serviceId = serviceId;
- this.serviceLauncher = new ServiceBasedAppLauncher(appLauncherProperties, serviceName);
-
- this.fs = buildFileSystem(config);
- this.serviceWorkDir = serviceWorkDirOptional.isPresent() ? serviceWorkDirOptional.get()
- : getServiceWorkDirPath(this.fs, serviceName, serviceId);
-
- // Initialize TopologyCatalog
- this.isTopologyCatalogEnabled = ConfigUtils.getBoolean(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
- if (isTopologyCatalogEnabled) {
- this.topologyCatalog = new TopologyCatalog(config, Optional.of(LOGGER));
- this.serviceLauncher.addService(topologyCatalog);
- }
- // Initialize FlowCatalog
- this.isFlowCatalogEnabled = ConfigUtils.getBoolean(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
- if (isFlowCatalogEnabled) {
- this.flowCatalog = new FlowCatalog(config, Optional.of(LOGGER));
- this.flowCatalogLocalCommit = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT,
- ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT);
- this.serviceLauncher.addService(flowCatalog);
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState( configuration.getInnerConfig()), this.getClass());
+ this.metrics = new Metrics(this.metricContext, configuration.getInnerConfig());
+ this.serviceLauncher = new ServiceBasedAppLauncher(appLauncherProperties, configuration.getServiceName());
- this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
+ this.fs = buildFileSystem(configuration.getInnerConfig());
- if (this.isGitConfigMonitorEnabled) {
- this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
- this.serviceLauncher.addService(this.gitConfigMonitor);
- }
- } else {
- this.isGitConfigMonitorEnabled = false;
- }
+ this.serviceWorkDir = ObjectUtils.firstNonNull(configuration.getServiceWorkDir(),
+ getServiceWorkDirPath(this.fs, configuration.getServiceName(), configuration.getServiceId()));
- // Initialize Helix leader guage
- helixLeaderGauges = Optional.of(new HelixLeaderState());
- String helixLeaderStateGaugeName =
- MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
- ContextAwareGauge<Integer> gauge = metricContext.newContextAwareGauge(helixLeaderStateGaugeName, () -> helixLeaderGauges.get().state.getValue());
- metricContext.register(helixLeaderStateGaugeName, gauge);
-
-
- // Initialize Helix
- Optional<String> zkConnectionString = Optional.fromNullable(ConfigUtils.getString(config,
- ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, null));
- if (zkConnectionString.isPresent()) {
- LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString);
- // This will create and register a Helix controller in ZooKeeper
- this.helixManager = Optional.fromNullable(buildHelixManager(config, zkConnectionString.get()));
- } else {
- LOGGER.info("No ZooKeeper connection string. Running in single instance mode.");
- this.helixManager = Optional.absent();
- }
-
- this.isDagManagerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
- // Initialize DagManager
- if (this.isDagManagerEnabled) {
- this.dagManager = new DagManager(config);
- this.serviceLauncher.addService(this.dagManager);
- }
-
- this.isJobStatusMonitorEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true) ;
- // Initialize JobStatusMonitor
- if (this.isJobStatusMonitorEnabled) {
- this.jobStatusMonitor = new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
- this.serviceLauncher.addService(this.jobStatusMonitor);
- }
-
- this.flowStatusGenerator = buildFlowStatusGenerator(this.config);
-
- // Initialize ServiceScheduler
- this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
- if (isSchedulerEnabled) {
- this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.fromNullable(this.dagManager), Optional.of(LOGGER));
- this.orchestrator.setFlowStatusGenerator(this.flowStatusGenerator);
-
- SchedulerService schedulerService = new SchedulerService(ConfigUtils.configToProperties(config));
+ initializeHelixLeaderGauge();
+ }
- this.scheduler = new GobblinServiceJobScheduler(this.serviceName, config, this.helixManager,
- Optional.of(this.flowCatalog), Optional.of(this.topologyCatalog), this.orchestrator,
- schedulerService, Optional.of(LOGGER));
- this.serviceLauncher.addService(schedulerService);
- this.serviceLauncher.addService(this.scheduler);
- }
+ public static GobblinServiceManager create(String serviceName, String serviceId, Config config,
+ @Nullable Path serviceWorkDir) {
+ return create(new GobblinServiceConfiguration(serviceName, serviceId, config, serviceWorkDir));
+ }
- // Initialize RestLI
- boolean forceLeader = ConfigUtils.getBoolean(this.config, ServiceConfigKeys.FORCE_LEADER, ServiceConfigKeys.DEFAULT_FORCE_LEADER);
-
- this.resourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
- this.flowCatalogLocalCommit,
- new FlowConfigResourceLocalHandler(this.flowCatalog),
- this.helixManager,
- this.scheduler,
- forceLeader);
-
- this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
- this.flowCatalogLocalCommit,
- new FlowConfigV2ResourceLocalHandler(this.flowCatalog),
- this.helixManager,
- this.scheduler,
- forceLeader);
-
- this.flowExecutionResourceHandler = new GobblinServiceFlowExecutionResourceHandler(new FlowExecutionResourceLocalHandler(this.flowStatusGenerator),
- this.eventBus, this.helixManager, forceLeader);
-
- this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
-
- ClassAliasResolver<GroupOwnershipService> groupOwnershipAliasResolver = new ClassAliasResolver<>(GroupOwnershipService.class);
- String groupOwnershipServiceClass = ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE;
- LOGGER.info("I am here " + groupOwnershipServiceClass);
- if (config.hasPath(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS)) {
- groupOwnershipServiceClass = config.getString(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS);
- LOGGER.info("Initializing with group ownership service " + groupOwnershipServiceClass);
- }
- this.groupOwnershipService = GobblinConstructorUtils.invokeConstructor(GroupOwnershipService.class,
- groupOwnershipAliasResolver.resolve(groupOwnershipServiceClass), config);
+ public static GobblinServiceManager create(GobblinServiceConfiguration serviceConfiguration) {
+ GobblinServiceGuiceModule guiceModule = new GobblinServiceGuiceModule(serviceConfiguration);
- if (isRestLIServerEnabled) {
- Injector injector = Guice.createInjector(new Module() {
- @Override
- public void configure(Binder binder) {
- binder.bind(FlowConfigsResourceHandler.class)
- .annotatedWith(Names.named(FlowConfigsResource.INJECT_FLOW_CONFIG_RESOURCE_HANDLER))
- .toInstance(GobblinServiceManager.this.resourceHandler);
- binder.bind(FlowConfigsResourceHandler.class)
- .annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME))
- .toInstance(GobblinServiceManager.this.v2ResourceHandler);
- binder.bind(FlowExecutionResourceHandler.class)
- .annotatedWith(Names.named(FlowExecutionResource.FLOW_EXECUTION_GENERATOR_INJECT_NAME))
- .toInstance(GobblinServiceManager.this.flowExecutionResourceHandler);
- binder.bindConstant()
- .annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE))
- .to(Boolean.TRUE);
- binder.bindConstant()
- .annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE))
- .to(Boolean.TRUE);
- binder.bind(RequesterService.class)
- .annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE))
- .toInstance(new NoopRequesterService(config));
- binder.bind(RequesterService.class)
- .annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE))
- .toInstance(new NoopRequesterService(config));
- binder.bind(GroupOwnershipService.class)
- .annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE))
- .toInstance(GobblinServiceManager.this.groupOwnershipService);
- }
- });
- this.restliServer = EmbeddedRestliServer.builder()
- .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
- .injector(injector)
- .build();
- if (config.hasPath(ServiceConfigKeys.SERVICE_PORT)) {
- this.restliServer.setPort(config.getInt(ServiceConfigKeys.SERVICE_PORT));
- }
+ Injector injector = Guice.createInjector(Stage.PRODUCTION, guiceModule);
+ return injector.getInstance(GobblinServiceManager.class);
+ }
- this.serviceLauncher.addService(restliServer);
+ public URI getRestLiServerListeningURI() {
+ if (restliServer == null) {
+ throw new IllegalStateException("Restli server does not exist because it was not configured or disabled");
}
+ return restliServer.getListeningURI();
+ }
- // Register Scheduler to listen to changes in Flows
- if (isSchedulerEnabled) {
- this.flowCatalog.addListener(this.scheduler);
- }
-
- // Initialize TopologySpecFactory
- this.isTopologySpecFactoryEnabled = ConfigUtils.getBoolean(config,
- ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
- if (this.isTopologySpecFactoryEnabled) {
- this.aliasResolver = new ClassAliasResolver<>(TopologySpecFactory.class);
- String topologySpecFactoryClass = ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY;
- if (config.hasPath(ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY)) {
- topologySpecFactoryClass = config.getString(ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY);
- }
-
- try {
- LOGGER.info("Using TopologySpecFactory class name/alias " + topologySpecFactoryClass);
- this.topologySpecFactory = (TopologySpecFactory) ConstructorUtils
- .invokeConstructor(Class.forName(this.aliasResolver.resolve(topologySpecFactoryClass)), config);
- } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException
- | InstantiationException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
+ private void initializeHelixLeaderGauge() {
+ helixLeaderGauges = Optional.of(new HelixLeaderState());
+ String helixLeaderStateGaugeName =
+ MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
+ ContextAwareGauge<Integer> gauge = metricContext.newContextAwareGauge(helixLeaderStateGaugeName, () -> helixLeaderGauges.get().state.getValue());
+ metricContext.register(helixLeaderStateGaugeName, gauge);
}
@VisibleForTesting
@@ -392,18 +254,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
return !helixManager.isPresent() || helixManager.get().isLeader();
}
- /**
- * Build the {@link HelixManager} for the Service Master.
- */
- private HelixManager buildHelixManager(Config config, String zkConnectionString) {
- String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
- String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
-
- LOGGER.info("Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
- HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
-
- return HelixUtils.buildHelixManager(helixInstanceName, helixClusterName, zkConnectionString);
- }
private FileSystem buildFileSystem(Config config)
throws IOException {
@@ -416,18 +266,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
return new Path(fs.getHomeDirectory(), serviceName + Path.SEPARATOR + serviceId);
}
- private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
- JobStatusRetriever jobStatusRetriever;
- try {
- Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
- jobStatusRetriever =
- (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
- } catch (ReflectiveOperationException e) {
- LOGGER.error("Exception encountered when instantiating JobStatusRetriever");
- throw new RuntimeException(e);
- }
- return FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
- }
+
/**
* Handle leadership change.
@@ -438,7 +277,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
this.helixManager.get().isLeader());
- if (this.isSchedulerEnabled) {
+ if (configuration.isSchedulerEnabled()) {
LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
this.scheduler.setActive(true);
}
@@ -447,11 +286,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
helixLeaderGauges.get().setState(LeaderState.MASTER);
}
- if (this.isGitConfigMonitorEnabled) {
+ if (configuration.isGitConfigMonitorEnabled()) {
this.gitConfigMonitor.setActive(true);
}
- if (this.isDagManagerEnabled) {
+ if (configuration.isDagManagerEnabled()) {
//Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
if (this.topologyCatalog.getInitComplete().getCount() == 0) {
this.dagManager.setActive(true);
@@ -462,7 +301,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
this.helixManager.get().isLeader());
- if (this.isSchedulerEnabled) {
+ if (configuration.isSchedulerEnabled()) {
LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
this.scheduler.setActive(false);
}
@@ -471,21 +310,83 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
helixLeaderGauges.get().setState(LeaderState.SLAVE);
}
- if (this.isGitConfigMonitorEnabled) {
+ if (configuration.isGitConfigMonitorEnabled()) {
this.gitConfigMonitor.setActive(false);
}
- if (this.isDagManagerEnabled) {
+ if (configuration.isDagManagerEnabled()) {
this.dagManager.setActive(false);
this.eventBus.unregister(this.dagManager);
}
}
}
+ private void registerServicesInLauncher(){
+ if (configuration.isTopologyCatalogEnabled()) {
+ this.serviceLauncher.addService(topologyCatalog);
+ }
+
+ if (configuration.isFlowCatalogEnabled()) {
+ this.serviceLauncher.addService(flowCatalog);
+
+ if (configuration.isGitConfigMonitorEnabled()) {
+ this.serviceLauncher.addService(gitConfigMonitor);
+ }
+ }
+
+ if (configuration.isDagManagerEnabled()) {
+ this.serviceLauncher.addService(dagManager);
+ }
+
+ if (configuration.isJobStatusMonitorEnabled()) {
+ this.serviceLauncher.addService(jobStatusMonitor);
+ }
+
+ if (configuration.isSchedulerEnabled()) {
+ this.serviceLauncher.addService(schedulerService);
+ this.serviceLauncher.addService(scheduler);
+ }
+
+ if (configuration.isRestLIServerEnabled()) {
+ this.serviceLauncher.addService(restliServer);
+ }
+ }
+
+ private void configureServices(){
+ if (configuration.isRestLIServerEnabled()) {
+ this.restliServer = EmbeddedRestliServer.builder()
+ .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
+ .injector(injector)
+ .build();
+
+ if (configuration.getInnerConfig().hasPath(ServiceConfigKeys.SERVICE_PORT)) {
+ this.restliServer.setPort(configuration.getInnerConfig().getInt(ServiceConfigKeys.SERVICE_PORT));
+ }
+ }
+
+ registerServicesInLauncher();
+
+ // Register Scheduler to listen to changes in Flows
+ if (configuration.isSchedulerEnabled()) {
+ this.flowCatalog.addListener(this.scheduler);
+ }
+ }
+
+ private void ensureInjected() {
+ if (resourceHandler == null) {
+ throw new IllegalStateException("GobblinServiceManager should be constructed through Guice dependency injection "
+ + "or through a static factory method");
+ }
+ }
+
@Override
public void start() throws ApplicationException {
LOGGER.info("[Init] Starting the Gobblin Service Manager");
+ ensureInjected();
+
+ configureServices();
+
if (this.helixManager.isPresent()) {
connectHelixManager();
}
@@ -505,12 +406,12 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
// Update for first time since there might be no notification
if (helixManager.get().isLeader()) {
- if (this.isSchedulerEnabled) {
+ if (configuration.isSchedulerEnabled()) {
LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler.");
this.scheduler.setActive(true);
}
- if (this.isGitConfigMonitorEnabled) {
+ if (configuration.isGitConfigMonitorEnabled()) {
this.gitConfigMonitor.setActive(true);
}
@@ -519,7 +420,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
}
} else {
- if (this.isSchedulerEnabled) {
+ if (configuration.isSchedulerEnabled()) {
LOGGER.info("[Init] Gobblin Service is running in slave instance mode, not enabling Scheduler.");
}
if (helixLeaderGauges.isPresent()) {
@@ -532,14 +433,14 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
LOGGER.info("[Init] Gobblin Service is running in single instance mode, enabling Scheduler.");
this.scheduler.setActive(true);
- if (this.isGitConfigMonitorEnabled) {
+ if (configuration.isGitConfigMonitorEnabled()) {
this.gitConfigMonitor.setActive(true);
}
}
// Populate TopologyCatalog with all Topologies generated by TopologySpecFactory
// This has to be done after the topologyCatalog service is launched
- if (this.isTopologySpecFactoryEnabled) {
+ if (configuration.isTopologySpecFactoryEnabled()) {
Collection<TopologySpec> topologySpecs = this.topologySpecFactory.getTopologies();
for (TopologySpec topologySpec : topologySpecs) {
this.topologyCatalog.put(topologySpec);
@@ -548,7 +449,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
// Register Orchestrator to listen to changes in topology
// This has to be done after topologySpecFactory has updated spec store, so that listeners will have the latest updates.
- if (isSchedulerEnabled) {
+ if (configuration.isSchedulerEnabled()) {
this.topologyCatalog.addListener(this.orchestrator);
}
@@ -560,7 +461,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
//Activate the DagManager service, after the topologyCatalog has been initialized.
if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
- if (this.isDagManagerEnabled) {
+ if (configuration.isDagManagerEnabled()) {
this.dagManager.setActive(true);
this.eventBus.register(this.dagManager);
}
@@ -592,7 +493,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
this.helixManager.get()
.getMessagingService()
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
- new ControllerUserDefinedMessageHandlerFactory(flowCatalogLocalCommit, scheduler, resourceHandler, serviceName));
+ new ControllerUserDefinedMessageHandlerFactory(flowCatalogLocalCommit, scheduler, resourceHandler,
+ configuration.getServiceName()));
}
} catch (Exception e) {
LOGGER.error("HelixManager failed to connect", e);
@@ -673,9 +575,15 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
}
Config config = ConfigFactory.load();
- try (GobblinServiceManager gobblinServiceManager = new GobblinServiceManager(
- cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd),
- config, Optional.<Path>absent())) {
+
+ GobblinServiceConfiguration serviceConfiguration =
+ new GobblinServiceConfiguration(cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd), config,
+ null);
+
+ GobblinServiceGuiceModule guiceModule = new GobblinServiceGuiceModule(serviceConfiguration);
+ Injector injector = Guice.createInjector(guiceModule);
+
+ try (GobblinServiceManager gobblinServiceManager = injector.getInstance(GobblinServiceManager.class)) {
gobblinServiceManager.start();
if (isTestMode) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 31a7361..821d87e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -51,6 +51,8 @@ import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -116,6 +118,7 @@ import static org.apache.gobblin.service.ExecutionStatus.*;
*/
@Alpha
@Slf4j
+@Singleton
public class DagManager extends AbstractIdleService {
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
@@ -127,8 +130,6 @@ public class DagManager extends AbstractIdleService {
private static final Integer TERMINATION_TIMEOUT = 30;
public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads";
public static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval";
- private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
- private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
private static final String FAILED_DAG_RETENTION_TIME_UNIT = FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
@@ -193,7 +194,7 @@ public class DagManager extends AbstractIdleService {
private volatile boolean isActive = false;
- public DagManager(Config config, boolean instrumentationEnabled) {
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
this.config = config;
this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
this.queue = initializeDagQueue(this.numThreads);
@@ -216,25 +217,12 @@ public class DagManager extends AbstractIdleService {
this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
}
- try {
- this.jobStatusRetriever = createJobStatusRetriever(config);
- } catch (ReflectiveOperationException e) {
- throw new RuntimeException("Exception encountered during DagManager initialization", e);
- }
+ this.jobStatusRetriever = jobStatusRetriever;
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
}
- JobStatusRetriever createJobStatusRetriever(Config config) throws ReflectiveOperationException {
- Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
- return (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
- }
-
- KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws ReflectiveOperationException {
- return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
- }
-
DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
try {
Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
@@ -254,8 +242,9 @@ public class DagManager extends AbstractIdleService {
return queue;
}
- public DagManager(Config config) {
- this(config, true);
+ @Inject
+ public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
+ this(config, jobStatusRetriever, true);
}
/** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index cde02b3..635337c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
import lombok.Getter;
import lombok.Setter;
@@ -67,11 +69,8 @@ import org.apache.gobblin.service.modules.flow.SpecCompiler;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
/**
@@ -79,8 +78,8 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
* to {@link TopologyCatalog} and updates {@link SpecCompiler} state.
*/
@Alpha
+@Singleton
public class Orchestrator implements SpecCatalogListener, Instrumentable {
- private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
protected final Logger _log;
protected final SpecCompiler specCompiler;
@@ -104,12 +103,15 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
- public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
- boolean instrumentationEnabled) {
+
+ public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
+ Optional<DagManager> dagManager, Optional<Logger> log, boolean instrumentationEnabled) {
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
this.topologyCatalog = topologyCatalog;
this.dagManager = dagManager;
+ this.flowStatusGenerator = flowStatusGenerator;
+
try {
String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -146,26 +148,12 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
}
- public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
- this(config, topologyCatalog, dagManager, log, true);
- }
-
- public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Logger log) {
- this(config, topologyCatalog, dagManager, Optional.of(log));
+ @Inject
+ public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
+ Optional<DagManager> dagManager, Optional<Logger> log) {
+ this(config, flowStatusGenerator, topologyCatalog, dagManager, log, true);
}
- public Orchestrator(Config config, Logger log) {
- this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.of(log));
- }
-
- /** Constructor with no logging */
- public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog) {
- this(config, topologyCatalog, Optional.<DagManager>absent(), Optional.<Logger>absent());
- }
-
- public Orchestrator(Config config) {
- this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.<Logger>absent());
- }
@VisibleForTesting
public SpecCompiler getSpecCompiler() {
@@ -401,18 +389,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
}
}
- private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
- JobStatusRetriever jobStatusRetriever;
- try {
- Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
- jobStatusRetriever =
- (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
- } catch (ReflectiveOperationException e) {
- _log.error("Exception encountered when instantiating JobStatusRetriever");
- throw new RuntimeException(e);
- }
- return FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
- }
@Nonnull
@Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 12ce857..f1b6e0a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -35,6 +35,8 @@ import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
import com.linkedin.restli.server.util.PatchApplier;
+import javax.inject.Inject;
+import javax.inject.Named;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -48,6 +50,7 @@ import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
/**
@@ -65,11 +68,13 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
private GobblinServiceJobScheduler jobScheduler;
private boolean forceLeader;
- public GobblinServiceFlowConfigResourceHandler(String serviceName, boolean flowCatalogLocalCommit,
+ @Inject
+ public GobblinServiceFlowConfigResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
+ @Named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT) boolean flowCatalogLocalCommit,
FlowConfigResourceLocalHandler handler,
Optional<HelixManager> manager,
GobblinServiceJobScheduler jobScheduler,
- boolean forceLeader) {
+ @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
this.flowCatalogLocalCommit = flowCatalogLocalCommit;
this.serviceName = serviceName;
this.localHandler = handler;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
new file mode 100644
index 0000000..74ed9a0
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import org.apache.helix.HelixManager;
+
+import com.google.common.base.Optional;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
+
+
+public class GobblinServiceFlowConfigV2ResourceHandler extends GobblinServiceFlowConfigResourceHandler
+ implements FlowConfigsV2ResourceHandler {
+
+ @Inject
+ public GobblinServiceFlowConfigV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
+ @Named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT) boolean flowCatalogLocalCommit,
+ FlowConfigV2ResourceLocalHandler handler, Optional<HelixManager> manager, GobblinServiceJobScheduler jobScheduler,
+ @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
+ super(serviceName, flowCatalogLocalCommit, handler, manager, jobScheduler, forceLeader);
+ }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 538fc57..c2d845d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -29,13 +29,18 @@ import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.UpdateResponse;
+import javax.inject.Inject;
+import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.service.FlowExecution;
import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
import org.apache.gobblin.service.monitoring.KillFlowEvent;
import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
@@ -47,13 +52,15 @@ import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
*/
@Slf4j
public class GobblinServiceFlowExecutionResourceHandler implements FlowExecutionResourceHandler {
- private FlowExecutionResourceHandler localHandler;
+ private FlowExecutionResourceLocalHandler localHandler;
private EventBus eventBus;
private Optional<HelixManager> helixManager;
private boolean forceLeader;
- public GobblinServiceFlowExecutionResourceHandler(FlowExecutionResourceHandler handler, EventBus eventBus,
- Optional<HelixManager> manager, boolean forceLeader) {
+ @Inject
+ public GobblinServiceFlowExecutionResourceHandler(FlowExecutionResourceLocalHandler handler,
+ @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
+ Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
this.localHandler = handler;
this.eventBus = eventBus;
this.helixManager = manager;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d4cadc6..1c6d89a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -40,6 +40,9 @@ import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -61,6 +64,8 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
@@ -73,6 +78,7 @@ import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFI
* and runs them via {@link Orchestrator}.
*/
@Alpha
+@Singleton
public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {
// Scheduler related configuration
@@ -105,9 +111,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
*/
public static final String DR_FILTER_TAG = "dr";
- public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager,
- Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
- SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+ @Inject
+ public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config,
+ Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
+ Orchestrator orchestrator, SchedulerService schedulerService, Optional<Logger> log) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -120,11 +127,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
}
- public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager,
+ public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
+ Optional<HelixManager> helixManager,
Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
- SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+ SchedulerService schedulerService, Optional<Logger> log) throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, topologyCatalog, dagManager, log), schedulerService, log);
+ new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, log);
}
public synchronized void setActive(boolean isActive) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
index d1bb267..670641e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
@@ -31,16 +31,20 @@ import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.SpecExecutor;
@Alpha
+@Singleton
public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
@@ -52,6 +56,7 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
this(config, Optional.<Logger>absent());
}
+ @Inject
public ConfigBasedTopologySpecFactory(Config config, Optional<Logger> log) {
Preconditions.checkNotNull(config, "Config should not be null");
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
similarity index 65%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
copy to gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
index 639c6a1..ebe6142 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
@@ -14,20 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gobblin.service;
-import com.typesafe.config.Config;
-import java.util.List;
-import org.apache.gobblin.annotation.Alias;
+package org.apache.gobblin.service.modules.utils;
-
-@Alias("noop")
-public class NoopGroupOwnershipService extends GroupOwnershipService{
-
- public NoopGroupOwnershipService(Config config) {
- }
-
- public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, String group) {
- return true;
- }
+/**
+ * These names are used for dependency injection, when we need to inject different instances of the same type,
+ * or inject constants.
+ * */
+public final class InjectionNames {
+ public static final String SERVICE_NAME = "serviceName";
+ public static final String FORCE_LEADER = "forceLeader";
+ public static final String FLOW_CATALOG_LOCAL_COMMIT = "flowCatalogLocalCommit";
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index fba549f..85ee51a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -25,15 +25,14 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
-import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
+import javax.inject.Inject;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -41,7 +40,6 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.metrics.event.TimingEvent;
/**
@@ -56,6 +54,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
@Getter
private final FileContextBasedFsStateStore<State> stateStore;
+ @Inject
public FsJobStatusRetriever(Config config) {
this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().
createStateStore(config.getConfig(CONF_PREFIX), State.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 6e9b64b..741a246 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -21,6 +21,8 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import javax.inject.Inject;
+import javax.inject.Provider;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -34,11 +36,18 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
* A factory implementation that returns a {@link KafkaJobStatusMonitor} instance.
*/
@Slf4j
-public class KafkaJobStatusMonitorFactory {
+public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMonitor> {
private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
- public KafkaJobStatusMonitor createJobStatusMonitor(Config config)
+ Config config;
+
+ @Inject
+ public KafkaJobStatusMonitorFactory(Config config) {
+ this.config = config;
+ }
+
+ private KafkaJobStatusMonitor createJobStatusMonitor()
throws ReflectiveOperationException {
Config jobStatusConfig = config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
@@ -64,4 +73,13 @@ public class KafkaJobStatusMonitorFactory {
jobStatusConfig = jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
return (KafkaJobStatusMonitor) GobblinConstructorUtils.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads);
}
+
+ @Override
+ public KafkaJobStatusMonitor get() {
+ try {
+ return createJobStatusMonitor();
+ } catch (ReflectiveOperationException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index f753e8c..ef30050 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -41,7 +41,6 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.google.gson.Gson;
@@ -49,13 +48,11 @@ import com.google.gson.GsonBuilder;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.RestLiResponseException;
-import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -101,7 +98,7 @@ public class GobblinServiceManagerTest {
private final URI TEST_URI = FlowSpec.Utils.createFlowSpecUri(TEST_FLOW_ID);
- private MockGobblinServiceManager gobblinServiceManager;
+ private GobblinServiceManager gobblinServiceManager;
private FlowConfigV2Client flowConfigClient;
private Git gitForPush;
@@ -168,12 +165,12 @@ public class GobblinServiceManagerTest {
this.gitForPush.commit().setMessage("First commit").call();
this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
- this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
- ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
+ this.gobblinServiceManager = GobblinServiceManager.create("CoreService", "1",
+ ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(SERVICE_WORK_DIR));
this.gobblinServiceManager.start();
this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
- this.gobblinServiceManager.getRestLiServer().getListeningURI().getPort()), transportClientProperties);
+ this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), transportClientProperties);
}
private void cleanUpDir(String dir) throws Exception {
@@ -508,23 +505,10 @@ null, null, null, null);
private void serviceReboot() throws Exception {
this.gobblinServiceManager.stop();
- this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
- ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
- this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
- this.gobblinServiceManager.getRestLiServer().getPort()), transportClientProperties);
+ this.gobblinServiceManager = GobblinServiceManager.create("CoreService", "1",
+ ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(SERVICE_WORK_DIR));
this.gobblinServiceManager.start();
- }
-
- class MockGobblinServiceManager extends GobblinServiceManager {
-
- public MockGobblinServiceManager(String serviceName, String serviceId, Config config,
- Optional<Path> serviceWorkDirOptional)
- throws Exception {
- super(serviceName, serviceId, config, serviceWorkDirOptional);
- }
-
- protected EmbeddedRestliServer getRestLiServer() {
- return this.restliServer;
- }
+ this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
+ this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), transportClientProperties);
}
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index cd10a82..4263b31 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -25,9 +25,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
-import org.jetbrains.annotations.Nullable;
-import org.mockito.Mockito;
-import org.mockito.exceptions.base.MockitoAssertionError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -35,13 +32,10 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Optional;
import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
import com.linkedin.data.template.StringMap;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.RestLiResponseException;
-import com.typesafe.config.Config;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
@@ -53,12 +47,11 @@ import org.apache.gobblin.service.FlowConfigClient;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.utils.HelixUtils;
import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
+
@Test
public class GobblinServiceHATest {
@@ -148,6 +141,7 @@ public class GobblinServiceHATest {
commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+ commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
Properties node1ServiceCoreProperties = new Properties();
node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
@@ -166,13 +160,13 @@ public class GobblinServiceHATest {
node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
// Start Node 1
- this.node1GobblinServiceManager = new TestGobblinServiceManager("CoreService1", "1",
- ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
+ this.node1GobblinServiceManager = GobblinServiceManager.create("CoreService1", "1",
+ ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new Path(NODE_1_SERVICE_WORK_DIR));
this.node1GobblinServiceManager.start();
// Start Node 2
- this.node2GobblinServiceManager = new TestGobblinServiceManager("CoreService2", "2",
- ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
+ this.node2GobblinServiceManager = GobblinServiceManager.create("CoreService2", "2",
+ ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new Path(NODE_2_SERVICE_WORK_DIR));
this.node2GobblinServiceManager.start();
// Initialize Node 1 Client
@@ -549,30 +543,7 @@ public class GobblinServiceHATest {
logger.info("Total failover time in ms: " + (failOverEndTime - failOverStartTime));
Assert.assertTrue(assertSuccess, "New master should take over all old master jobs.");
-
- // Check eventbus was registered with new leader
- AssertWithBackoff assertWithBackoff = AssertWithBackoff.create().logger(LoggerFactory.getLogger("checkEventbusRegistered")).timeoutMs(20000);
- assertWithBackoff.assertTrue(new com.google.common.base.Predicate<Void>() {
- @Override
- public boolean apply(@Nullable Void input) {
- try {
- Mockito.verify(secondary.getEventBus(), Mockito.atLeastOnce()).register(secondary.getDagManager());
- return true;
- } catch (MockitoAssertionError e) {
- return false;
- }
- }
- }, "Checking eventBus was registered");
-
logger.info("+++++++++++++++++++ testKillNode END");
}
- public class TestGobblinServiceManager extends GobblinServiceManager {
- public TestGobblinServiceManager(String serviceName, String serviceId, Config config, Optional<Path> serviceWorkDirOptional) throws Exception {
- super(serviceName, serviceId, config, serviceWorkDirOptional);
- this.isDagManagerEnabled = true;
- this.eventBus = Mockito.mock(EventBus.class);
- this.dagManager = Mockito.mock(DagManager.class);
- }
- }
}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index ce4cf71..003e318 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -31,7 +31,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import com.linkedin.data.template.StringMap;
@@ -167,13 +166,13 @@ public class GobblinServiceRedirectTest {
node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2);
// Start Node 1
- this.node1GobblinServiceManager = new GobblinServiceManager("RedirectCoreService1", "1",
- ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
+ this.node1GobblinServiceManager = GobblinServiceManager.create("RedirectCoreService1", "1",
+ ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new Path(NODE_1_SERVICE_WORK_DIR));
this.node1GobblinServiceManager.start();
// Start Node 2
- this.node2GobblinServiceManager = new GobblinServiceManager("RedirectCoreService2", "2",
- ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
+ this.node2GobblinServiceManager = GobblinServiceManager.create("RedirectCoreService2", "2",
+ ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new Path(NODE_2_SERVICE_WORK_DIR));
this.node2GobblinServiceManager.start();
// Initialize Node 1 Client
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 67ade43..92810e0 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -297,11 +297,10 @@ class CancelPredicate implements Predicate<Void> {
class MockedDagManager extends DagManager {
public MockedDagManager(Config config, boolean instrumentationEnabled) {
- super(config, instrumentationEnabled);
+ super(config, createJobStatusRetriever(), instrumentationEnabled);
}
- @Override
- JobStatusRetriever createJobStatusRetriever(Config config) {
+ private static JobStatusRetriever createJobStatusRetriever() {
JobStatusRetriever mockedJbStatusRetriever = Mockito.mock(JobStatusRetriever.class);
Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).
getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
@@ -309,11 +308,6 @@ class MockedDagManager extends DagManager {
}
@Override
- KafkaJobStatusMonitor createJobStatusMonitor(Config config) {
- return null;
- }
-
- @Override
DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
DagStateStore mockedDagStateStore = Mockito.mock(DagStateStore.class);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 7087c1e..689bfa2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,10 +17,6 @@
package org.apache.gobblin.service.modules.orchestration;
-import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import java.io.File;
import java.net.URI;
import java.net.URISyntaxException;
@@ -29,6 +25,8 @@ import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -45,11 +43,15 @@ import com.typesafe.config.Config;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
@@ -108,6 +110,7 @@ public class OrchestratorTest {
this.serviceLauncher.addService(flowCatalog);
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+ mock(FlowStatusGenerator.class),
Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 7de8178..e37d324 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -113,6 +113,7 @@ ext.externalDependency = [
"mysqlConnector": "mysql:mysql-connector-java:8.0.24",
"javaxInject": "javax.inject:javax.inject:1",
"guice": "com.google.inject:guice:4.0",
+ "guiceMultibindings": "com.google.inject.extensions:guice-multibindings:4.0",
"guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
"derby": "org.apache.derby:derby:10.12.1.1",
"mockito": "org.mockito:mockito-core:1.10.19",