You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/08 17:31:16 UTC

[gobblin] branch master updated: [GOBBLIN-1444] Use Guice as DI framework in Gobblin service

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bf37c76  [GOBBLIN-1444] Use Guice as DI framework in Gobblin service
bf37c76 is described below

commit bf37c76f6da228ae96b0c405d184617f03d2c198
Author: aprokofiev <ap...@linkedin.com>
AuthorDate: Tue Jun 8 10:31:08 2021 -0700

    [GOBBLIN-1444] Use Guice as DI framework in Gobblin service
    
    Previously, to initialize Gobblin service, we used
    a mixture
    of dependency injection, direct class creation and
    config-based
    class creation. In this change, we unify the
    service initialization
    by moving towards using dependency injection(DI)
    with Guice everywhere.
    
    Using DI will help with (1) unit testing; (2)
    overriding classes in the
    middle of the dependency with company-specific
    implementations, and
    (3) will improve code readability, as dependencies
    between classes
    become visible from the outside and explicit.
    
    We also move away from name-based injection for
    classes. Name-based
    injection is useful when code needs several
    different implementations
    of the same interface. In our use case, we had
    only one implementation
    for each service that can be active. Name-based
    injection was used for
    company-specific overrides, but there is a better
    way to do it - with
    Guice module overrides.
    
    There are still several improvements left to be
    done to have a full
    Guice migration, but we'll make them in separate
    commits to limit the
    PR size.
    
    Closes #3281 from aplex/guice-migration
---
 .../apache/gobblin/util/ClassAliasResolver.java    |  16 +-
 .../org/apache/gobblin/service/FlowConfigTest.java |   6 +-
 .../apache/gobblin/service/FlowConfigV2Test.java   |   6 +-
 .../org/apache/gobblin/service/FlowStatusTest.java |   6 +-
 .../service/FlowConfigResourceLocalHandler.java    |   2 +
 .../service/FlowConfigV2ResourceLocalHandler.java  |   6 +-
 .../gobblin/service/FlowConfigsResource.java       |  10 +-
 .../gobblin/service/FlowConfigsV2Resource.java     |   8 +-
 ...vice.java => FlowConfigsV2ResourceHandler.java} |  16 +-
 .../gobblin/service/FlowExecutionResource.java     |   3 +-
 .../service/FlowExecutionResourceLocalHandler.java |   2 +
 .../apache/gobblin/service/FlowStatusResource.java |   9 +-
 .../gobblin/service/LdapGroupOwnershipService.java |  13 +-
 .../service/LocalGroupOwnershipService.java        |  14 +-
 .../gobblin/service/NoopGroupOwnershipService.java |   9 +-
 .../gobblin/service/NoopRequesterService.java      |   3 +
 .../gobblin/restli/EmbeddedRestliServer.java       |   2 +
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |   5 +
 .../runtime/spec_catalog/TopologyCatalog.java      |   4 +
 .../apache/gobblin/scheduler/SchedulerService.java |   8 +-
 .../service/monitoring/FlowStatusGenerator.java    |   8 +-
 .../monitoring/FlowStatusGeneratorTest.java        |   2 +-
 gobblin-service/build.gradle                       |   7 +-
 ...ControllerUserDefinedMessageHandlerFactory.java |   8 +-
 .../service/modules/core/GitConfigMonitor.java     |   4 +
 .../modules/core/GobblinServiceConfiguration.java  | 115 ++++++
 .../modules/core/GobblinServiceGuiceModule.java    | 253 ++++++++++++
 .../modules/core/GobblinServiceManager.java        | 440 ++++++++-------------
 .../service/modules/orchestration/DagManager.java  |  27 +-
 .../modules/orchestration/Orchestrator.java        |  48 +--
 .../GobblinServiceFlowConfigResourceHandler.java   |   9 +-
 .../GobblinServiceFlowConfigV2ResourceHandler.java |  43 ++
 ...GobblinServiceFlowExecutionResourceHandler.java |  13 +-
 .../scheduler/GobblinServiceJobScheduler.java      |  20 +-
 .../topology/ConfigBasedTopologySpecFactory.java   |   7 +-
 .../service/modules/utils/InjectionNames.java      |  23 +-
 .../service/monitoring/FsJobStatusRetriever.java   |   5 +-
 .../monitoring/KafkaJobStatusMonitorFactory.java   |  22 +-
 .../gobblin/service/GobblinServiceManagerTest.java |  32 +-
 .../service/modules/core/GobblinServiceHATest.java |  41 +-
 .../modules/core/GobblinServiceRedirectTest.java   |   9 +-
 .../modules/orchestration/DagManagerFlowTest.java  |  10 +-
 .../modules/orchestration/OrchestratorTest.java    |  11 +-
 gradle/scripts/dependencyDefinitions.gradle        |   1 +
 44 files changed, 800 insertions(+), 506 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
index a255795..97708ef 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/util/ClassAliasResolver.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.util;
 import java.util.List;
 import java.util.Map;
 
-import lombok.extern.slf4j.Slf4j;
-
 import org.reflections.Reflections;
 import org.reflections.util.ConfigurationBuilder;
 
@@ -29,6 +27,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import javax.annotation.concurrent.ThreadSafe;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.annotation.Alias;
 
 
@@ -49,16 +50,16 @@ import org.apache.gobblin.annotation.Alias;
  * </b>
  */
 @Slf4j
+@ThreadSafe
 public class ClassAliasResolver<T> {
 
   // Scan all packages in the classpath with prefix gobblin, com.linkedin.gobblin when class is loaded.
   // Since scan is expensive we do it only once when class is loaded.
-  private static final Reflections REFLECTIONS = new Reflections(new ConfigurationBuilder().forPackages("gobblin",
-      "com.linkedin.gobblin", "org.apache.gobblin"));
-
-  Map<String, Class<? extends T>> aliasToClassCache;
+  private static final Reflections REFLECTIONS =
+      new Reflections(new ConfigurationBuilder().forPackages("gobblin", "com.linkedin.gobblin", "org.apache.gobblin"));
   private final List<Alias> aliasObjects;
   private final Class<T> subtypeOf;
+  ImmutableMap<String, Class<? extends T>> aliasToClassCache;
 
   public ClassAliasResolver(Class<T> subTypeOf) {
     Map<String, Class<? extends T>> cache = Maps.newHashMap();
@@ -108,7 +109,8 @@ public class ClassAliasResolver<T> {
       return Class.forName(aliasOrClassName).asSubclass(this.subtypeOf);
     } catch (ClassCastException cce) {
       throw new ClassNotFoundException(
-          String.format("Found class %s but it cannot be cast to %s.", aliasOrClassName, this.subtypeOf.getName()), cce);
+          String.format("Found class %s but it cannot be cast to %s.", aliasOrClassName, this.subtypeOf.getName()),
+          cce);
     }
   }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 8876725..c2eee20 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -18,8 +18,6 @@
 package org.apache.gobblin.service;
 
 import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
 import java.util.ArrayList;
 import java.util.Map;
 
@@ -94,14 +92,12 @@ public class FlowConfigTest {
        @Override
        public void configure(Binder binder) {
          binder.bind(FlowConfigsResourceHandler.class)
-             .annotatedWith(Names.named(FlowConfigsResource.INJECT_FLOW_CONFIG_RESOURCE_HANDLER))
              .toInstance(new FlowConfigResourceLocalHandler(flowCatalog));
 
          // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have
          // been made
          binder.bindConstant().annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
-         binder.bind(RequesterService.class)
-             .annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE)).toInstance(new NoopRequesterService(config));
+         binder.bind(RequesterService.class).toInstance(new NoopRequesterService(config));
        }
     });
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
index 0710a3e..aa067c0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigV2Test.java
@@ -119,12 +119,12 @@ public class FlowConfigV2Test {
     Injector injector = Guice.createInjector(new Module() {
       @Override
       public void configure(Binder binder) {
-        binder.bind(FlowConfigsResourceHandler.class).annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME)).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog));
+        binder.bind(FlowConfigsV2ResourceHandler.class).toInstance(new FlowConfigV2ResourceLocalHandler(flowCatalog));
         // indicate that we are in unit testing since the resource is being blocked until flow catalog changes have
         // been made
         binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
-        binder.bind(RequesterService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE)).toInstance(_requesterService);
-        binder.bind(GroupOwnershipService.class).annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE)).toInstance(groupOwnershipService);
+        binder.bind(RequesterService.class).toInstance(_requesterService);
+        binder.bind(GroupOwnershipService.class).toInstance(groupOwnershipService);
       }
     });
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
index e133582..16e08fc 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowStatusTest.java
@@ -34,7 +34,6 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
-import com.google.inject.name.Names;
 import com.linkedin.restli.server.resources.BaseResource;
 
 import org.apache.gobblin.configuration.State;
@@ -83,13 +82,12 @@ public class FlowStatusTest {
   @BeforeClass
   public void setUp() throws Exception {
     JobStatusRetriever jobStatusRetriever = new TestJobStatusRetriever();
-    final FlowStatusGenerator flowStatusGenerator =
-        FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+    final FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
 
     Injector injector = Guice.createInjector(new Module() {
        @Override
        public void configure(Binder binder) {
-         binder.bind(FlowStatusGenerator.class).annotatedWith(Names.named(FlowStatusResource.FLOW_STATUS_GENERATOR_INJECT_NAME))
+         binder.bind(FlowStatusGenerator.class)
              .toInstance(flowStatusGenerator);
        }
     });
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 7244afb..8430acf 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -36,6 +36,7 @@ import com.linkedin.restli.server.UpdateResponse;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.inject.Inject;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -64,6 +65,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
   protected final ContextAwareMeter deleteFlow;
   protected final ContextAwareMeter runImmediatelyFlow;
 
+  @Inject
   public FlowConfigResourceLocalHandler(FlowCatalog flowCatalog) {
     this.flowCatalog = flowCatalog;
     MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index 86fead8..289ab89 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.gobblin.service;
+
 import java.util.Map;
 
 import org.apache.commons.lang3.StringEscapeUtils;
@@ -27,6 +28,7 @@ import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 
+import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -34,11 +36,13 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 @Slf4j
-public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
+public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsV2ResourceHandler {
 
+  @Inject
   public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
     super(flowCatalog);
   }
+
   @Override
   /**
    * Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
index 9ee00c0..95c0d37 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java
@@ -27,9 +27,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSet;
-import javax.inject.Inject;
-import javax.inject.Named;
-
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
@@ -38,6 +35,9 @@ import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
+import javax.inject.Inject;
+import javax.inject.Named;
+
 /**
  * Resource for handling flow configuration requests
  */
@@ -45,19 +45,15 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, EmptyRecord, FlowConfig> {
   private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsResource.class);
 
-  public static final String INJECT_FLOW_CONFIG_RESOURCE_HANDLER = "flowConfigsResourceHandler";
-  public static final String INJECT_REQUESTER_SERVICE = "requesterService";
   public static final String INJECT_READY_TO_USE = "readToUse";
 
   private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
 
   @Inject
-  @Named(INJECT_FLOW_CONFIG_RESOURCE_HANDLER)
   private FlowConfigsResourceHandler flowConfigsResourceHandler;
 
   // For getting who sends the request
   @Inject
-  @Named(INJECT_REQUESTER_SERVICE)
   private RequesterService requesterService;
 
   // For blocking use of this resource until it is ready
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 1ea39bc..00b49f7 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -58,10 +58,7 @@ import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 @RestLiCollection(name = "flowconfigsV2", namespace = "org.apache.gobblin.service", keyName = "id")
 public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, FlowStatusId, FlowConfig> {
   private static final Logger LOG = LoggerFactory.getLogger(FlowConfigsV2Resource.class);
-  public static final String FLOW_CONFIG_GENERATOR_INJECT_NAME = "flowConfigsV2ResourceHandler";
-  public static final String INJECT_REQUESTER_SERVICE = "v2RequesterService";
   public static final String INJECT_READY_TO_USE = "v2ReadyToUse";
-  public static final String INJECT_GROUP_OWNERSHIP_SERVICE = "v2GroupOwnershipService";
   private static final Set<String> ALLOWED_METADATA = ImmutableSet.of("delete.state.store");
 
 
@@ -69,12 +66,10 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
   public static FlowConfigsResourceHandler global_flowConfigsResourceHandler = null;
 
   @Inject
-  @Named(FLOW_CONFIG_GENERATOR_INJECT_NAME)
-  private FlowConfigsResourceHandler flowConfigsResourceHandler;
+  private FlowConfigsV2ResourceHandler flowConfigsResourceHandler;
 
   // For getting who sends the request
   @Inject
-  @Named(INJECT_REQUESTER_SERVICE)
   private RequesterService requesterService;
 
   // For blocking use of this resource until it is ready
@@ -83,7 +78,6 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
   private Boolean readyToUse;
 
   @Inject
-  @Named(INJECT_GROUP_OWNERSHIP_SERVICE)
   private GroupOwnershipService groupOwnershipService;
 
   public FlowConfigsV2Resource() {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandler.java
similarity index 69%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
copy to gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandler.java
index 639c6a1..7528cc8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2ResourceHandler.java
@@ -14,20 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.service;
-
-import com.typesafe.config.Config;
-import java.util.List;
-import org.apache.gobblin.annotation.Alias;
-
 
-@Alias("noop")
-public class NoopGroupOwnershipService extends GroupOwnershipService{
-
-   public NoopGroupOwnershipService(Config config) {
-   }
+package org.apache.gobblin.service;
 
-   public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, String group) {
-      return true;
-   }
+public interface FlowConfigsV2ResourceHandler extends FlowConfigsResourceHandler {
 }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
index 9b6c7cf..5adec70 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResource.java
@@ -41,9 +41,8 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
  */
 @RestLiCollection(name = "flowexecutions", namespace = "org.apache.gobblin.service", keyName = "id")
 public class FlowExecutionResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowExecution> {
-  public static final String FLOW_EXECUTION_GENERATOR_INJECT_NAME = "FlowExecutionResourceHandler";
 
-  @Inject @javax.inject.Inject @javax.inject.Named(FLOW_EXECUTION_GENERATOR_INJECT_NAME)
+  @Inject
   FlowExecutionResourceHandler flowExecutionResourceHandler;
 
   public FlowExecutionResource() {}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
index b6d30b8..f61fc96 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowExecutionResourceLocalHandler.java
@@ -30,6 +30,7 @@ import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.RestLiServiceException;
 import com.linkedin.restli.server.UpdateResponse;
 
+import javax.inject.Inject;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.monitoring.FlowStatus;
@@ -42,6 +43,7 @@ public class FlowExecutionResourceLocalHandler implements FlowExecutionResourceH
 
   private final FlowStatusGenerator flowStatusGenerator;
 
+  @Inject
   public FlowExecutionResourceLocalHandler(FlowStatusGenerator flowStatusGenerator) {
     this.flowStatusGenerator = flowStatusGenerator;
   }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 062af97..48ad8e8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -20,10 +20,6 @@ package org.apache.gobblin.service;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.server.PagingContext;
@@ -34,6 +30,8 @@ import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 
+import javax.inject.Inject;
+
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 
 
@@ -42,10 +40,9 @@ import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
  */
 @RestLiCollection(name = "flowstatuses", namespace = "org.apache.gobblin.service", keyName = "id")
 public class FlowStatusResource extends ComplexKeyResourceTemplate<FlowStatusId, EmptyRecord, FlowStatus> {
-  public static final String FLOW_STATUS_GENERATOR_INJECT_NAME = "FlowStatusGenerator";
   public static final String MESSAGE_SEPARATOR = ", ";
 
-  @Inject @javax.inject.Inject @javax.inject.Named(FLOW_STATUS_GENERATOR_INJECT_NAME)
+  @Inject
   FlowStatusGenerator _flowStatusGenerator;
 
   public FlowStatusResource() {}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java
index f97d12e..684b385 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LdapGroupOwnershipService.java
@@ -16,25 +16,32 @@
  */
 package org.apache.gobblin.service;
 
-
-import com.typesafe.config.Config;
 import java.util.List;
 import java.util.Set;
+
+import org.apache.log4j.Logger;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import javax.naming.NamingException;
 import javax.naming.PartialResultException;
+
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.util.LdapUtils;
-import org.apache.log4j.Logger;
 
 
 /**
  * Queries external Active Directory service to check if the requester is part of the group
  */
 @Alias("ldap")
+@Singleton
 public class LdapGroupOwnershipService extends GroupOwnershipService {
   LdapUtils ldapUtils;
   private static final Logger logger = Logger.getLogger(LdapGroupOwnershipService.class);
 
+  @Inject
   public LdapGroupOwnershipService(Config config) {
     this.ldapUtils = new LdapUtils(config);
   }
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
index abe3bf3..7b07142 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/LocalGroupOwnershipService.java
@@ -16,14 +16,20 @@
  */
 package org.apache.gobblin.service;
 
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.base.Splitter;
 import com.google.gson.JsonObject;
 import com.typesafe.config.Config;
-import java.io.IOException;
-import java.util.List;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.util.filesystem.PathAlterationObserver;
-import org.apache.hadoop.fs.Path;
 
 
 /**
@@ -31,11 +37,13 @@ import org.apache.hadoop.fs.Path;
  * and values denote a list of group members
  */
 @Alias("local")
+@Singleton
 public class LocalGroupOwnershipService extends GroupOwnershipService {
   public static final String GROUP_MEMBER_LIST = "groupOwnershipService.groupMembers.path";
   LocalGroupOwnershipPathAlterationListener listener;
   PathAlterationObserver observer;
 
+  @Inject
   public LocalGroupOwnershipService(Config config) {
     Path groupOwnershipFilePath = new Path(config.getString(GROUP_MEMBER_LIST));
     try {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
index 639c6a1..c76a085 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
@@ -16,14 +16,21 @@
  */
 package org.apache.gobblin.service;
 
-import com.typesafe.config.Config;
 import java.util.List;
+
+import com.typesafe.config.Config;
+
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
 import org.apache.gobblin.annotation.Alias;
 
 
 @Alias("noop")
+@Singleton
 public class NoopGroupOwnershipService extends GroupOwnershipService{
 
+   @Inject
    public NoopGroupOwnershipService(Config config) {
    }
 
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java
index 1b7c082..c38d697 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopRequesterService.java
@@ -23,12 +23,15 @@ import com.google.common.collect.Lists;
 import com.linkedin.restli.server.resources.BaseResource;
 import com.typesafe.config.Config;
 
+import javax.inject.Inject;
+
 
 /**
  * Default requester service which does not track any requester information.
  */
 public class NoopRequesterService extends RequesterService {
 
+  @Inject
   public NoopRequesterService(Config config) {
     super(config);
   }
diff --git a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java
index 8d3452e..3919064 100644
--- a/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java
+++ b/gobblin-restli/gobblin-restli-utils/src/main/java/org/apache/gobblin/restli/EmbeddedRestliServer.java
@@ -34,6 +34,7 @@ import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Module;
+import com.google.inject.Singleton;
 import com.linkedin.r2.filter.FilterChain;
 import com.linkedin.r2.filter.FilterChains;
 import com.linkedin.r2.filter.compression.EncodingType;
@@ -69,6 +70,7 @@ import lombok.Setter;
  * * name - defaults to the name of the first resource in the resource collection.
  * * injector - an {@link Injector} to inject dependencies into the Rest.li resources.
  */
+@Singleton
 public class EmbeddedRestliServer extends AbstractIdleService {
 
   private static final int MAX_PORT = 65535;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 147a86b..0b6e7a4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -34,11 +34,14 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigException;
 
 import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.Getter;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
@@ -66,6 +69,7 @@ import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
  * A service that interact with FlowSpec storage.
  * The FlowSpec storage, a.k.a. {@link SpecStore} should be plugable with different implementation.
  */
+@Singleton
 public class FlowCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog {
 
   /***
@@ -97,6 +101,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     this(config, log, Optional.<MetricContext>absent(), true);
   }
 
+  @Inject
   public FlowCatalog(Config config, GobblinInstanceEnvironment env) {
     this(config, Optional.of(env.getLog()), Optional.of(env.getMetricContext()),
         env.isInstrumentationEnabled());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 5d86768..6464d9f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -39,6 +39,8 @@ import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 
 import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -65,6 +67,7 @@ import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
 
 
 @Alpha
+@Singleton
 public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, MutableSpecCatalog {
 
   public static final String DEFAULT_TOPOLOGYSPEC_STORE_CLASS = FSSpecStore.class.getCanonicalName();
@@ -88,6 +91,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
     this(config, log, Optional.<MetricContext>absent(), true);
   }
 
+  @Inject
   public TopologyCatalog(Config config, GobblinInstanceEnvironment env) {
     this(config, Optional.of(env.getLog()), Optional.of(env.getMetricContext()),
         env.isInstrumentationEnabled());
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
index 2d0136f..285eb44 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/scheduler/SchedulerService.java
@@ -27,17 +27,20 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
+import lombok.Getter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
-import lombok.Getter;
-
 
 /**
  * A {@link com.google.common.util.concurrent.Service} wrapping a Quartz {@link Scheduler} allowing correct shutdown
  * of the scheduler when {@link JobScheduler} fails to initialize.
  */
+@Singleton
 public class SchedulerService extends AbstractIdleService {
 
   @Getter
@@ -57,6 +60,7 @@ public class SchedulerService extends AbstractIdleService {
         Optional.of(PropertiesUtils.extractPropertiesWithPrefix(props, Optional.of("org.quartz."))));
   }
 
+  @Inject
   public SchedulerService(Config cfg) {
     this(cfg.hasPath(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) ?
          cfg.getBoolean(ConfigurationKeys.SCHEDULER_WAIT_FOR_JOB_COMPLETION_KEY) :
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
index 7546e12..909e856 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/service/monitoring/FlowStatusGenerator.java
@@ -25,7 +25,7 @@ import java.util.stream.Collectors;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 
-import lombok.Builder;
+import javax.inject.Inject;
 
 import org.apache.gobblin.annotation.Alpha;
 
@@ -34,13 +34,17 @@ import org.apache.gobblin.annotation.Alpha;
  * Generator for {@link FlowStatus}, which relies on a {@link JobStatusRetriever}.
  */
 @Alpha
-@Builder
 public class FlowStatusGenerator {
   public static final List<String> FINISHED_STATUSES = Lists.newArrayList("FAILED", "COMPLETE", "CANCELLED");
   public static final int MAX_LOOKBACK = 100;
 
   private final JobStatusRetriever jobStatusRetriever;
 
+  @Inject
+  public FlowStatusGenerator(JobStatusRetriever jobStatusRetriever) {
+    this.jobStatusRetriever = jobStatusRetriever;
+  }
+
   /**
    * Get the flow statuses of last <code>count</code> (or fewer) executions
    * @param flowName
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
index 4515e51..01a75be 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/service/monitoring/FlowStatusGeneratorTest.java
@@ -34,7 +34,7 @@ public class FlowStatusGeneratorTest {
     String flowGroup = "testGroup";
     Mockito.when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
 
-    FlowStatusGenerator flowStatusGenerator = FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
+    FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
     Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup));
 
     //If a flow is COMPILED, isFlowRunning() should return true.
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index d83cc57..c32885b 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -48,6 +48,7 @@ dependencies {
   compile externalDependency.guava
   compile externalDependency.guavaretrying
   compile externalDependency.guice
+  compile externalDependency.guiceMultibindings
   compile externalDependency.hadoopClientCommon
   compile externalDependency.hadoopCommon
   compile externalDependency.helix
@@ -128,12 +129,6 @@ configurations {
   compile {
     transitive = false
   }
-
-  testRuntime {
-    resolutionStrategy {
-      force 'com.google.inject:guice:3.0'
-    }
-  }
 }
 
 artifacts {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
index 08a9cb5..41e7a33 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/ControllerUserDefinedMessageHandlerFactory.java
@@ -37,10 +37,10 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsResourceHandler;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.restli.FlowConfigUtils;
-import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 
 /**
@@ -51,7 +51,7 @@ import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactory {
   private boolean flowCatalogLocalCommit;
   private GobblinServiceJobScheduler jobScheduler;
-  private GobblinServiceFlowConfigResourceHandler resourceHandler;
+  private FlowConfigsResourceHandler resourceHandler;
   private String serviceName;
 
   @Override
@@ -80,12 +80,12 @@ class ControllerUserDefinedMessageHandlerFactory implements MessageHandlerFactor
   private static class ControllerUserDefinedMessageHandler extends MessageHandler {
     private boolean flowCatalogLocalCommit;
     private GobblinServiceJobScheduler jobScheduler;
-    private GobblinServiceFlowConfigResourceHandler resourceHandler;
+    private FlowConfigsResourceHandler resourceHandler;
     private String serviceName;
 
     public ControllerUserDefinedMessageHandler(Message message, NotificationContext context, String serviceName,
         boolean flowCatalogLocalCommit, GobblinServiceJobScheduler scheduler,
-        GobblinServiceFlowConfigResourceHandler resourceHandler) {
+        FlowConfigsResourceHandler resourceHandler) {
       super(message, context);
       this.serviceName = serviceName;
       this.flowCatalogLocalCommit = flowCatalogLocalCommit;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 7bbd4f6..5aa0b43 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -27,6 +27,8 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.config.ConfigBuilder;
@@ -45,6 +47,7 @@ import org.apache.gobblin.util.PullFileLoader;
  * The <flowGroup> and <flowName> is used to generate the URI used to store the config in the {@link FlowCatalog}
  */
 @Slf4j
+@Singleton
 public class GitConfigMonitor extends GitMonitoringService {
   public static final String GIT_CONFIG_MONITOR_PREFIX = "gobblin.service.gitConfigMonitor";
 
@@ -72,6 +75,7 @@ public class GitConfigMonitor extends GitMonitoringService {
   private final FlowCatalog flowCatalog;
   private final Config emptyConfig = ConfigFactory.empty();
 
+  @Inject
   GitConfigMonitor(Config config, FlowCatalog flowCatalog) {
     super(config.getConfig(GIT_CONFIG_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK));
     this.flowCatalog = flowCatalog;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
new file mode 100644
index 0000000..0326dd7
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceConfiguration.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.util.Objects;
+
+import org.apache.hadoop.fs.Path;
+
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+@ToString
+public class GobblinServiceConfiguration {
+
+  @Getter
+  private final String serviceName;
+
+  @Getter
+  private final String serviceId;
+
+  @Getter
+  private final boolean isTopologyCatalogEnabled;
+
+  @Getter
+  private final boolean isFlowCatalogEnabled;
+
+  @Getter
+  private final boolean isSchedulerEnabled;
+
+  @Getter
+  private final boolean isRestLIServerEnabled;
+
+  @Getter
+  private final boolean isTopologySpecFactoryEnabled;
+
+  @Getter
+  private final boolean isGitConfigMonitorEnabled;
+
+  @Getter
+  private final boolean isDagManagerEnabled;
+
+  @Getter
+  private final boolean isJobStatusMonitorEnabled;
+
+  @Getter
+  private final boolean isHelixManagerEnabled;
+
+  @Getter
+  private final boolean flowCatalogLocalCommit;
+
+  @Getter
+  private final Config innerConfig;
+
+  @Getter
+  @Nullable
+  private final Path serviceWorkDir;
+
+  public GobblinServiceConfiguration(String serviceName, String serviceId, Config config,
+      @Nullable Path serviceWorkDir) {
+    this.serviceName = Objects.requireNonNull(serviceName,"Service name cannot be null");
+    this.serviceId = Objects.requireNonNull(serviceId,"Service id cannot be null");
+    this.innerConfig = Objects.requireNonNull(config, "Config cannot be null");
+    this.serviceWorkDir = serviceWorkDir;
+
+    isTopologyCatalogEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
+    isFlowCatalogEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
+
+    if (isFlowCatalogEnabled) {
+      flowCatalogLocalCommit =
+          ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT,
+              ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT);
+      isGitConfigMonitorEnabled =
+          ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
+    } else {
+      flowCatalogLocalCommit = false;
+      isGitConfigMonitorEnabled = false;
+    }
+
+    this.isHelixManagerEnabled = config.hasPath(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY);
+    this.isDagManagerEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
+    this.isJobStatusMonitorEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true);
+    this.isSchedulerEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
+    this.isRestLIServerEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
+    this.isTopologySpecFactoryEnabled =
+        ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
new file mode 100644
index 0000000..a4d1458
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.core;
+
+import java.util.Objects;
+
+import org.apache.helix.HelixManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Binder;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provider;
+import com.google.inject.multibindings.OptionalBinder;
+import com.google.inject.name.Names;
+import com.typesafe.config.Config;
+
+import javax.inject.Singleton;
+
+import org.apache.gobblin.restli.EmbeddedRestliServer;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.instance.StandardGobblinInstanceLauncher;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsResource;
+import org.apache.gobblin.service.FlowConfigsResourceHandler;
+import org.apache.gobblin.service.FlowConfigsV2Resource;
+import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResource;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
+import org.apache.gobblin.service.FlowStatusResource;
+import org.apache.gobblin.service.GroupOwnershipService;
+import org.apache.gobblin.service.NoopRequesterService;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigV2ResourceHandler;
+import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
+import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
+import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.service.monitoring.JobStatusRetriever;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
+import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+public class GobblinServiceGuiceModule implements Module {
+  private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceGuiceModule.class);
+  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
+
+  GobblinServiceConfiguration serviceConfig;
+
+  public GobblinServiceGuiceModule(GobblinServiceConfiguration serviceConfig) {
+    this.serviceConfig = Objects.requireNonNull(serviceConfig);
+  }
+
+  @Override
+  public void configure(Binder binder) {
+    LOGGER.info("Configuring bindings for the following service settings: {}", serviceConfig);
+
+    // In the current code base, we frequently inject classes instead of interfaces
+    // As a result, even when the binding is missing, Guice will create an instance of the
+    // the class and inject it. This interferes with disabling of different services and
+    // components, because without explicit bindings they will get instantiated anyway.
+    binder.requireExplicitBindings();
+
+    // Optional binder will find the existing binding for T and create additional binding for Optional<T>.
+    // If none of the specific class binding exist, optional will be "absent".
+    OptionalBinder.newOptionalBinder(binder, Logger.class);
+
+    binder.bind(Logger.class).toInstance(LoggerFactory.getLogger(GobblinServiceManager.class));
+
+    binder.bind(Config.class).toInstance(serviceConfig.getInnerConfig());
+
+    binder.bind(GobblinServiceConfiguration.class).toInstance(serviceConfig);
+
+    // Used by TopologyCatalog and FlowCatalog
+    GobblinInstanceEnvironment gobblinInstanceEnvironment = StandardGobblinInstanceLauncher.builder()
+        .withLog(LoggerFactory.getLogger(GobblinServiceManager.class))
+        .setInstrumentationEnabled(true)
+        .withSysConfig(serviceConfig.getInnerConfig())
+        .build();
+
+    binder.bind(GobblinInstanceEnvironment.class).toInstance(gobblinInstanceEnvironment);
+
+    binder.bind(EventBus.class)
+        .annotatedWith(Names.named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME))
+        .toInstance(new EventBus(GobblinServiceManager.class.getSimpleName()));
+
+    binder.bindConstant().annotatedWith(Names.named(InjectionNames.SERVICE_NAME)).to(serviceConfig.getServiceName());
+
+    binder.bindConstant()
+        .annotatedWith(Names.named(InjectionNames.FORCE_LEADER))
+        .to(ConfigUtils.getBoolean(serviceConfig.getInnerConfig(), ServiceConfigKeys.FORCE_LEADER,
+            ServiceConfigKeys.DEFAULT_FORCE_LEADER));
+
+    binder.bindConstant()
+        .annotatedWith(Names.named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT))
+        .to(serviceConfig.getServiceName());
+
+    binder.bind(FlowConfigsResourceHandler.class).to(GobblinServiceFlowConfigResourceHandler.class);
+    binder.bind(FlowConfigsV2ResourceHandler.class).to(GobblinServiceFlowConfigV2ResourceHandler.class);
+    binder.bind(FlowExecutionResourceHandler.class).to(GobblinServiceFlowExecutionResourceHandler.class);
+
+
+    binder.bind(FlowConfigsResource.class);
+    binder.bind(FlowConfigsV2Resource.class);
+    binder.bind(FlowStatusResource.class);
+    binder.bind(FlowExecutionResource.class);
+
+    binder.bind(FlowConfigResourceLocalHandler.class);
+    binder.bind(FlowConfigV2ResourceLocalHandler.class);
+    binder.bind(FlowExecutionResourceLocalHandler.class);
+
+    binder.bindConstant().annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
+    binder.bindConstant().annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE)).to(Boolean.TRUE);
+    binder.bind(RequesterService.class)
+        .to(NoopRequesterService.class);
+
+    OptionalBinder.newOptionalBinder(binder, TopologyCatalog.class);
+    if (serviceConfig.isTopologyCatalogEnabled()) {
+      binder.bind(TopologyCatalog.class);
+    }
+
+    if (serviceConfig.isTopologySpecFactoryEnabled()) {
+      binder.bind(TopologySpecFactory.class)
+          .to(getClassByNameOrAlias(TopologySpecFactory.class, serviceConfig.getInnerConfig(),
+              ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY, ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY));
+    }
+
+    OptionalBinder.newOptionalBinder(binder, DagManager.class);
+    if (serviceConfig.isDagManagerEnabled()) {
+      binder.bind(DagManager.class);
+    }
+
+    OptionalBinder.newOptionalBinder(binder, HelixManager.class);
+    if (serviceConfig.isHelixManagerEnabled()) {
+      binder.bind(HelixManager.class)
+          .toInstance(buildHelixManager(serviceConfig.getInnerConfig(),
+              serviceConfig.getInnerConfig().getString(ServiceConfigKeys.ZK_CONNECTION_STRING_KEY)));
+    } else {
+      LOGGER.info("No ZooKeeper connection string. Running in single instance mode.");
+    }
+
+    OptionalBinder.newOptionalBinder(binder, FlowCatalog.class);
+    if (serviceConfig.isFlowCatalogEnabled()) {
+      binder.bind(FlowCatalog.class);
+    }
+
+    if (serviceConfig.isJobStatusMonitorEnabled()) {
+      binder.bind(KafkaJobStatusMonitor.class).toProvider(KafkaJobStatusMonitorFactory.class).in(Singleton.class);
+    }
+
+    binder.bind(FlowStatusGenerator.class);
+
+    if (serviceConfig.isSchedulerEnabled()) {
+      binder.bind(Orchestrator.class);
+      binder.bind(SchedulerService.class);
+      binder.bind(GobblinServiceJobScheduler.class);
+    }
+
+    if (serviceConfig.isGitConfigMonitorEnabled()) {
+      binder.bind(GitConfigMonitor.class);
+    }
+
+    binder.bind(GroupOwnershipService.class)
+        .to(getClassByNameOrAlias(GroupOwnershipService.class, serviceConfig.getInnerConfig(),
+            ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS, ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE));
+
+    binder.bind(JobStatusRetriever.class)
+        .to(getClassByNameOrAlias(JobStatusRetriever.class, serviceConfig.getInnerConfig(),
+            JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
+
+    if (serviceConfig.isRestLIServerEnabled()) {
+      binder.bind(EmbeddedRestliServer.class).toProvider(EmbeddedRestliServerProvider.class).in(Singleton.class);
+    }
+
+    binder.bind(GobblinServiceManager.class);
+
+    LOGGER.info("Bindings configured");
+  }
+
+  protected HelixManager buildHelixManager(Config config, String zkConnectionString) {
+    String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
+    String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
+
+    LOGGER.info(
+        "Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
+    HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
+
+    return HelixUtils.buildHelixManager(helixInstanceName, helixClusterName, zkConnectionString);
+  }
+
+  protected static <T> Class<? extends T> getClassByNameOrAlias(Class<T> baseClass, Config config,
+      String classPropertyName, String defaultClass) {
+    String className = ConfigUtils.getString(config, classPropertyName, defaultClass);
+    ClassAliasResolver<T> aliasResolver = new ClassAliasResolver<T>(baseClass);
+    try {
+      return (Class<? extends T>) Class.forName(aliasResolver.resolve(className));
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(
+          "Cannot resolve the class '" + className + "'. Check that property '" + classPropertyName
+              + "' points to a valid class name or alias.", e);
+    }
+  }
+
+  public static class EmbeddedRestliServerProvider implements Provider<EmbeddedRestliServer> {
+    Injector injector;
+
+    @Inject
+    public EmbeddedRestliServerProvider(Injector injector) {
+      this.injector = injector;
+    }
+
+    @Override
+    public EmbeddedRestliServer get() {
+      return EmbeddedRestliServer.builder()
+          .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
+          .injector(injector)
+          .build();
+    }
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index a9518d1..9818ef5 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -18,10 +18,10 @@
 package org.apache.gobblin.service.modules.core;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -30,9 +30,7 @@ import org.apache.commons.cli.DefaultParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.service.GroupOwnershipService;
-import org.apache.gobblin.service.NoopGroupOwnershipService;
+import org.apache.commons.lang3.ObjectUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -51,16 +49,17 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.eventbus.EventBus;
-import com.google.inject.Binder;
 import com.google.inject.Guice;
+import com.google.inject.Inject;
 import com.google.inject.Injector;
-import com.google.inject.Module;
-import com.google.inject.name.Names;
+import com.google.inject.Stage;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.r2.RemoteInvocationException;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.annotation.Nullable;
+import javax.inject.Named;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -79,37 +78,25 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
-import org.apache.gobblin.service.FlowExecutionResource;
-import org.apache.gobblin.service.FlowExecutionResourceHandler;
-import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigClient;
-import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
-import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
 import org.apache.gobblin.service.FlowConfigsResource;
 import org.apache.gobblin.service.FlowConfigsResourceHandler;
 import org.apache.gobblin.service.FlowConfigsV2Resource;
+import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceHandler;
 import org.apache.gobblin.service.FlowId;
-import org.apache.gobblin.service.NoopRequesterService;
-import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.GroupOwnershipService;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
-import org.apache.gobblin.service.modules.restli.GobblinServiceFlowConfigResourceHandler;
-import org.apache.gobblin.service.modules.restli.GobblinServiceFlowExecutionResourceHandler;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
-import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitor;
-import org.apache.gobblin.service.monitoring.KafkaJobStatusMonitorFactory;
-import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 @Alpha
@@ -120,269 +107,144 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
   public static final String SERVICE_NAME_OPTION_NAME = "service_name";
   public static final String SERVICE_ID_OPTION_NAME = "service_id";
 
+  public static final String SERVICE_EVENT_BUS_NAME = "GobblinServiceManagerEventBus";
+
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinServiceManager.class);
-  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
 
   protected final ServiceBasedAppLauncher serviceLauncher;
   private volatile boolean stopInProgress = false;
 
   // An EventBus used for communications between services running in the ApplicationMaster
+  @Inject
+  @Named(SERVICE_EVENT_BUS_NAME)
   @Getter
-  protected EventBus eventBus = new EventBus(GobblinServiceManager.class.getSimpleName());
+  protected EventBus eventBus;
 
   protected final FileSystem fs;
   protected final Path serviceWorkDir;
-  protected final String serviceName;
-  protected final String serviceId;
-
-  protected final boolean isTopologyCatalogEnabled;
-  protected final boolean isFlowCatalogEnabled;
-  protected final boolean isSchedulerEnabled;
-  protected final boolean isRestLIServerEnabled;
-  protected final boolean isTopologySpecFactoryEnabled;
-  protected final boolean isGitConfigMonitorEnabled;
-  protected boolean isDagManagerEnabled;
-  protected final boolean isJobStatusMonitorEnabled;
 
+  @Getter
+  protected final GobblinServiceConfiguration configuration;
+
+  @Inject(optional = true)
   protected TopologyCatalog topologyCatalog;
+
+  @Inject(optional = true)
   @Getter
   protected FlowCatalog flowCatalog;
+
+  @Inject(optional = true)
   @Getter
   protected GobblinServiceJobScheduler scheduler;
+
+  @Inject
   @Getter
-  protected GobblinServiceFlowConfigResourceHandler resourceHandler;
+  protected FlowConfigsResourceHandler resourceHandler;
+
+  @Inject
   @Getter
-  protected GobblinServiceFlowConfigResourceHandler v2ResourceHandler;
+  protected FlowConfigsV2ResourceHandler v2ResourceHandler;
+
+  @Inject
   @Getter
-  protected GobblinServiceFlowExecutionResourceHandler flowExecutionResourceHandler;
+  protected FlowExecutionResourceHandler flowExecutionResourceHandler;
+
+  @Inject
   @Getter
   protected FlowStatusGenerator flowStatusGenerator;
+
+  @Inject
   @Getter
   protected GroupOwnershipService groupOwnershipService;
 
+  @Inject
+  @Getter
+  private Injector injector;
+
   protected boolean flowCatalogLocalCommit;
+
+  @Inject(optional = true)
   @Getter
   protected Orchestrator orchestrator;
+
+  @Inject(optional = true)
   protected EmbeddedRestliServer restliServer;
+
+  @Inject(optional = true)
   protected TopologySpecFactory topologySpecFactory;
 
-  protected Optional<HelixManager> helixManager;
+  @Inject
+  protected SchedulerService schedulerService;
 
-  protected ClassAliasResolver<TopologySpecFactory> aliasResolver;
+  @Inject(optional = true)
+  protected Optional<HelixManager> helixManager;
 
+  @Inject(optional = true)
   protected GitConfigMonitor gitConfigMonitor;
 
+  @Inject(optional = true)
   @Getter
   protected DagManager dagManager;
 
+  @Inject(optional = true)
   protected KafkaJobStatusMonitor jobStatusMonitor;
 
   protected Optional<HelixLeaderState> helixLeaderGauges;
 
 
-  @Getter
-  protected Config config;
   private final MetricContext metricContext;
   private final Metrics metrics;
 
-  public GobblinServiceManager(String serviceName, String serviceId, Config config,
-      Optional<Path> serviceWorkDirOptional) throws Exception {
+  @Inject
+  protected GobblinServiceManager(GobblinServiceConfiguration configuration) throws Exception {
+    this.configuration = Objects.requireNonNull(configuration);
+
+    Properties appLauncherProperties = ConfigUtils.configToProperties(
+        ConfigUtils.getConfigOrEmpty(configuration.getInnerConfig(), ServiceConfigKeys.GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX)
+            .withFallback(configuration.getInnerConfig()));
 
-    Properties appLauncherProperties = ConfigUtils.configToProperties(ConfigUtils.getConfigOrEmpty(config,
-        ServiceConfigKeys.GOBBLIN_SERVICE_APP_LAUNCHER_PREFIX).withFallback(config));
     // Done to preserve backwards compatibility with the previously hard-coded timeout of 5 minutes
     if (!appLauncherProperties.contains(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS)) {
       appLauncherProperties.setProperty(ServiceBasedAppLauncher.APP_STOP_TIME_SECONDS, Long.toString(300));
     }
-    this.config = config;
-    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass());
-    this.metrics = new Metrics(this.metricContext, config);
-    this.serviceName = serviceName;
-    this.serviceId = serviceId;
-    this.serviceLauncher = new ServiceBasedAppLauncher(appLauncherProperties, serviceName);
-
-    this.fs = buildFileSystem(config);
-    this.serviceWorkDir = serviceWorkDirOptional.isPresent() ? serviceWorkDirOptional.get()
-        : getServiceWorkDirPath(this.fs, serviceName, serviceId);
-
-    // Initialize TopologyCatalog
-    this.isTopologyCatalogEnabled = ConfigUtils.getBoolean(config,
-        ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY, true);
-    if (isTopologyCatalogEnabled) {
-      this.topologyCatalog = new TopologyCatalog(config, Optional.of(LOGGER));
-      this.serviceLauncher.addService(topologyCatalog);
-    }
 
-    // Initialize FlowCatalog
-    this.isFlowCatalogEnabled = ConfigUtils.getBoolean(config,
-        ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY, true);
-    if (isFlowCatalogEnabled) {
-      this.flowCatalog = new FlowCatalog(config, Optional.of(LOGGER));
-      this.flowCatalogLocalCommit = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT,
-          ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT);
-      this.serviceLauncher.addService(flowCatalog);
+    this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState( configuration.getInnerConfig()), this.getClass());
+    this.metrics = new Metrics(this.metricContext, configuration.getInnerConfig());
+    this.serviceLauncher = new ServiceBasedAppLauncher(appLauncherProperties, configuration.getServiceName());
 
-      this.isGitConfigMonitorEnabled = ConfigUtils.getBoolean(config,
-          ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
+    this.fs = buildFileSystem(configuration.getInnerConfig());
 
-      if (this.isGitConfigMonitorEnabled) {
-        this.gitConfigMonitor = new GitConfigMonitor(config, this.flowCatalog);
-        this.serviceLauncher.addService(this.gitConfigMonitor);
-      }
-    } else {
-      this.isGitConfigMonitorEnabled = false;
-    }
+    this.serviceWorkDir = ObjectUtils.firstNonNull(configuration.getServiceWorkDir(),
+        getServiceWorkDirPath(this.fs, configuration.getServiceName(), configuration.getServiceId()));
 
-    // Initialize Helix leader guage
-    helixLeaderGauges = Optional.of(new HelixLeaderState());
-    String helixLeaderStateGaugeName =
-        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
-    ContextAwareGauge<Integer> gauge = metricContext.newContextAwareGauge(helixLeaderStateGaugeName, () -> helixLeaderGauges.get().state.getValue());
-    metricContext.register(helixLeaderStateGaugeName, gauge);
-
-
-    // Initialize Helix
-    Optional<String> zkConnectionString = Optional.fromNullable(ConfigUtils.getString(config,
-        ServiceConfigKeys.ZK_CONNECTION_STRING_KEY, null));
-    if (zkConnectionString.isPresent()) {
-      LOGGER.info("Using ZooKeeper connection string: " + zkConnectionString);
-      // This will create and register a Helix controller in ZooKeeper
-      this.helixManager = Optional.fromNullable(buildHelixManager(config, zkConnectionString.get()));
-    } else {
-      LOGGER.info("No ZooKeeper connection string. Running in single instance mode.");
-      this.helixManager = Optional.absent();
-    }
-
-    this.isDagManagerEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_DAG_MANAGER_ENABLED_KEY, false);
-    // Initialize DagManager
-    if (this.isDagManagerEnabled) {
-      this.dagManager = new DagManager(config);
-      this.serviceLauncher.addService(this.dagManager);
-    }
-
-    this.isJobStatusMonitorEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, true) ;
-    // Initialize JobStatusMonitor
-    if (this.isJobStatusMonitorEnabled) {
-      this.jobStatusMonitor = new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
-      this.serviceLauncher.addService(this.jobStatusMonitor);
-    }
-
-    this.flowStatusGenerator = buildFlowStatusGenerator(this.config);
-
-    // Initialize ServiceScheduler
-    this.isSchedulerEnabled = ConfigUtils.getBoolean(config,
-        ServiceConfigKeys.GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY, true);
-    if (isSchedulerEnabled) {
-      this.orchestrator = new Orchestrator(config, Optional.of(this.topologyCatalog), Optional.fromNullable(this.dagManager), Optional.of(LOGGER));
-      this.orchestrator.setFlowStatusGenerator(this.flowStatusGenerator);
-
-      SchedulerService schedulerService = new SchedulerService(ConfigUtils.configToProperties(config));
+    initializeHelixLeaderGauge();
+  }
 
-      this.scheduler = new GobblinServiceJobScheduler(this.serviceName, config, this.helixManager,
-          Optional.of(this.flowCatalog), Optional.of(this.topologyCatalog), this.orchestrator,
-          schedulerService, Optional.of(LOGGER));
-      this.serviceLauncher.addService(schedulerService);
-      this.serviceLauncher.addService(this.scheduler);
-    }
+  public static GobblinServiceManager create(String serviceName, String serviceId, Config config,
+      @Nullable Path serviceWorkDir) {
+    return create(new GobblinServiceConfiguration(serviceName, serviceId, config, serviceWorkDir));
+  }
 
-    // Initialize RestLI
-    boolean forceLeader = ConfigUtils.getBoolean(this.config, ServiceConfigKeys.FORCE_LEADER, ServiceConfigKeys.DEFAULT_FORCE_LEADER);
-
-    this.resourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
-        this.flowCatalogLocalCommit,
-        new FlowConfigResourceLocalHandler(this.flowCatalog),
-        this.helixManager,
-        this.scheduler,
-        forceLeader);
-
-    this.v2ResourceHandler = new GobblinServiceFlowConfigResourceHandler(serviceName,
-        this.flowCatalogLocalCommit,
-        new FlowConfigV2ResourceLocalHandler(this.flowCatalog),
-        this.helixManager,
-        this.scheduler,
-        forceLeader);
-
-    this.flowExecutionResourceHandler = new GobblinServiceFlowExecutionResourceHandler(new FlowExecutionResourceLocalHandler(this.flowStatusGenerator),
-        this.eventBus, this.helixManager, forceLeader);
-
-    this.isRestLIServerEnabled = ConfigUtils.getBoolean(config,
-        ServiceConfigKeys.GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY, true);
-
-    ClassAliasResolver<GroupOwnershipService> groupOwnershipAliasResolver = new ClassAliasResolver<>(GroupOwnershipService.class);
-    String groupOwnershipServiceClass = ServiceConfigKeys.DEFAULT_GROUP_OWNERSHIP_SERVICE;
-    LOGGER.info("I am here " + groupOwnershipServiceClass);
-    if (config.hasPath(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS)) {
-      groupOwnershipServiceClass = config.getString(ServiceConfigKeys.GROUP_OWNERSHIP_SERVICE_CLASS);
-      LOGGER.info("Initializing with group ownership service " + groupOwnershipServiceClass);
-    }
-     this.groupOwnershipService = GobblinConstructorUtils.invokeConstructor(GroupOwnershipService.class,
-          groupOwnershipAliasResolver.resolve(groupOwnershipServiceClass), config);
+  public static GobblinServiceManager create(GobblinServiceConfiguration serviceConfiguration) {
+    GobblinServiceGuiceModule guiceModule = new GobblinServiceGuiceModule(serviceConfiguration);
 
-    if (isRestLIServerEnabled) {
-      Injector injector = Guice.createInjector(new Module() {
-        @Override
-        public void configure(Binder binder) {
-          binder.bind(FlowConfigsResourceHandler.class)
-              .annotatedWith(Names.named(FlowConfigsResource.INJECT_FLOW_CONFIG_RESOURCE_HANDLER))
-              .toInstance(GobblinServiceManager.this.resourceHandler);
-          binder.bind(FlowConfigsResourceHandler.class)
-              .annotatedWith(Names.named(FlowConfigsV2Resource.FLOW_CONFIG_GENERATOR_INJECT_NAME))
-              .toInstance(GobblinServiceManager.this.v2ResourceHandler);
-          binder.bind(FlowExecutionResourceHandler.class)
-              .annotatedWith(Names.named(FlowExecutionResource.FLOW_EXECUTION_GENERATOR_INJECT_NAME))
-              .toInstance(GobblinServiceManager.this.flowExecutionResourceHandler);
-          binder.bindConstant()
-              .annotatedWith(Names.named(FlowConfigsResource.INJECT_READY_TO_USE))
-              .to(Boolean.TRUE);
-          binder.bindConstant()
-              .annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_READY_TO_USE))
-              .to(Boolean.TRUE);
-          binder.bind(RequesterService.class)
-              .annotatedWith(Names.named(FlowConfigsResource.INJECT_REQUESTER_SERVICE))
-              .toInstance(new NoopRequesterService(config));
-          binder.bind(RequesterService.class)
-              .annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_REQUESTER_SERVICE))
-              .toInstance(new NoopRequesterService(config));
-          binder.bind(GroupOwnershipService.class)
-              .annotatedWith(Names.named(FlowConfigsV2Resource.INJECT_GROUP_OWNERSHIP_SERVICE))
-              .toInstance(GobblinServiceManager.this.groupOwnershipService);
-        }
-      });
-      this.restliServer = EmbeddedRestliServer.builder()
-          .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
-          .injector(injector)
-          .build();
-      if (config.hasPath(ServiceConfigKeys.SERVICE_PORT)) {
-        this.restliServer.setPort(config.getInt(ServiceConfigKeys.SERVICE_PORT));
-      }
+    Injector injector = Guice.createInjector(Stage.PRODUCTION, guiceModule);
+    return injector.getInstance(GobblinServiceManager.class);
+  }
 
-      this.serviceLauncher.addService(restliServer);
+  public URI getRestLiServerListeningURI() {
+    if (restliServer == null) {
+      throw new IllegalStateException("Restli server does not exist because it was not configured or disabled");
     }
+    return restliServer.getListeningURI();
+  }
 
-    // Register Scheduler to listen to changes in Flows
-    if (isSchedulerEnabled) {
-      this.flowCatalog.addListener(this.scheduler);
-    }
-
-    // Initialize TopologySpecFactory
-    this.isTopologySpecFactoryEnabled = ConfigUtils.getBoolean(config,
-        ServiceConfigKeys.GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY, true);
-    if (this.isTopologySpecFactoryEnabled) {
-      this.aliasResolver = new ClassAliasResolver<>(TopologySpecFactory.class);
-      String topologySpecFactoryClass = ServiceConfigKeys.DEFAULT_TOPOLOGY_SPEC_FACTORY;
-      if (config.hasPath(ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY)) {
-        topologySpecFactoryClass = config.getString(ServiceConfigKeys.TOPOLOGYSPEC_FACTORY_KEY);
-      }
-
-      try {
-        LOGGER.info("Using TopologySpecFactory class name/alias " + topologySpecFactoryClass);
-        this.topologySpecFactory = (TopologySpecFactory) ConstructorUtils
-            .invokeConstructor(Class.forName(this.aliasResolver.resolve(topologySpecFactoryClass)), config);
-      } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException
-          | InstantiationException | ClassNotFoundException e) {
-        throw new RuntimeException(e);
-      }
-    }
+  private void initializeHelixLeaderGauge() {
+    helixLeaderGauges = Optional.of(new HelixLeaderState());
+    String helixLeaderStateGaugeName =
+        MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, ServiceMetricNames.HELIX_LEADER_STATE);
+    ContextAwareGauge<Integer> gauge = metricContext.newContextAwareGauge(helixLeaderStateGaugeName, () -> helixLeaderGauges.get().state.getValue());
+    metricContext.register(helixLeaderStateGaugeName, gauge);
   }
 
   @VisibleForTesting
@@ -392,18 +254,6 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     return !helixManager.isPresent() || helixManager.get().isLeader();
   }
 
-  /**
-   * Build the {@link HelixManager} for the Service Master.
-   */
-  private HelixManager buildHelixManager(Config config, String zkConnectionString) {
-    String helixClusterName = config.getString(ServiceConfigKeys.HELIX_CLUSTER_NAME_KEY);
-    String helixInstanceName = HelixUtils.buildHelixInstanceName(config, GobblinServiceManager.class.getSimpleName());
-
-    LOGGER.info("Creating Helix cluster if not already present [overwrite = false]: " + zkConnectionString);
-    HelixUtils.createGobblinHelixCluster(zkConnectionString, helixClusterName, false);
-
-    return HelixUtils.buildHelixManager(helixInstanceName, helixClusterName, zkConnectionString);
-  }
 
   private FileSystem buildFileSystem(Config config)
       throws IOException {
@@ -416,18 +266,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     return new Path(fs.getHomeDirectory(), serviceName + Path.SEPARATOR + serviceId);
   }
 
-  private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
-    JobStatusRetriever jobStatusRetriever;
-    try {
-      Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
-      jobStatusRetriever =
-          (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
-    } catch (ReflectiveOperationException e) {
-      LOGGER.error("Exception encountered when instantiating JobStatusRetriever");
-      throw new RuntimeException(e);
-    }
-    return FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
-  }
+
 
   /**
    * Handle leadership change.
@@ -438,7 +277,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       LOGGER.info("Leader notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
           this.helixManager.get().isLeader());
 
-      if (this.isSchedulerEnabled) {
+      if (configuration.isSchedulerEnabled()) {
         LOGGER.info("Gobblin Service is now running in master instance mode, enabling Scheduler.");
         this.scheduler.setActive(true);
       }
@@ -447,11 +286,11 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
         helixLeaderGauges.get().setState(LeaderState.MASTER);
       }
 
-      if (this.isGitConfigMonitorEnabled) {
+      if (configuration.isGitConfigMonitorEnabled()) {
         this.gitConfigMonitor.setActive(true);
       }
 
-      if (this.isDagManagerEnabled) {
+      if (configuration.isDagManagerEnabled()) {
         //Activate DagManager only if TopologyCatalog is initialized. If not; skip activation.
         if (this.topologyCatalog.getInitComplete().getCount() == 0) {
           this.dagManager.setActive(true);
@@ -462,7 +301,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       LOGGER.info("Leader lost notification for {} HM.isLeader {}", this.helixManager.get().getInstanceName(),
           this.helixManager.get().isLeader());
 
-      if (this.isSchedulerEnabled) {
+      if (configuration.isSchedulerEnabled()) {
         LOGGER.info("Gobblin Service is now running in slave instance mode, disabling Scheduler.");
         this.scheduler.setActive(false);
       }
@@ -471,21 +310,83 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
         helixLeaderGauges.get().setState(LeaderState.SLAVE);
       }
 
-      if (this.isGitConfigMonitorEnabled) {
+      if (configuration.isGitConfigMonitorEnabled()) {
         this.gitConfigMonitor.setActive(false);
       }
 
-      if (this.isDagManagerEnabled) {
+      if (configuration.isDagManagerEnabled()) {
         this.dagManager.setActive(false);
         this.eventBus.unregister(this.dagManager);
       }
     }
   }
 
+  private void registerServicesInLauncher(){
+    if (configuration.isTopologyCatalogEnabled()) {
+      this.serviceLauncher.addService(topologyCatalog);
+    }
+
+    if (configuration.isFlowCatalogEnabled()) {
+      this.serviceLauncher.addService(flowCatalog);
+
+      if (configuration.isGitConfigMonitorEnabled()) {
+        this.serviceLauncher.addService(gitConfigMonitor);
+      }
+    }
+
+    if (configuration.isDagManagerEnabled()) {
+      this.serviceLauncher.addService(dagManager);
+    }
+
+    if (configuration.isJobStatusMonitorEnabled()) {
+      this.serviceLauncher.addService(jobStatusMonitor);
+    }
+
+    if (configuration.isSchedulerEnabled()) {
+      this.serviceLauncher.addService(schedulerService);
+      this.serviceLauncher.addService(scheduler);
+    }
+
+    if (configuration.isRestLIServerEnabled()) {
+      this.serviceLauncher.addService(restliServer);
+    }
+  }
+
+  private void configureServices(){
+    if (configuration.isRestLIServerEnabled()) {
+      this.restliServer = EmbeddedRestliServer.builder()
+          .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
+          .injector(injector)
+          .build();
+
+      if (configuration.getInnerConfig().hasPath(ServiceConfigKeys.SERVICE_PORT)) {
+        this.restliServer.setPort(configuration.getInnerConfig().getInt(ServiceConfigKeys.SERVICE_PORT));
+      }
+    }
+
+    registerServicesInLauncher();
+
+    // Register Scheduler to listen to changes in Flows
+    if (configuration.isSchedulerEnabled()) {
+      this.flowCatalog.addListener(this.scheduler);
+    }
+  }
+
+  private void ensureInjected() {
+    if (resourceHandler == null) {
+      throw new IllegalStateException("GobblinServiceManager should be constructed through Guice dependency injection "
+          + "or through a static factory method");
+    }
+  }
+
   @Override
   public void start() throws ApplicationException {
     LOGGER.info("[Init] Starting the Gobblin Service Manager");
 
+    ensureInjected();
+
+    configureServices();
+
     if (this.helixManager.isPresent()) {
       connectHelixManager();
     }
@@ -505,12 +406,12 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
 
       // Update for first time since there might be no notification
       if (helixManager.get().isLeader()) {
-        if (this.isSchedulerEnabled) {
+        if (configuration.isSchedulerEnabled()) {
           LOGGER.info("[Init] Gobblin Service is running in master instance mode, enabling Scheduler.");
           this.scheduler.setActive(true);
         }
 
-        if (this.isGitConfigMonitorEnabled) {
+        if (configuration.isGitConfigMonitorEnabled()) {
           this.gitConfigMonitor.setActive(true);
         }
 
@@ -519,7 +420,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
         }
 
       } else {
-        if (this.isSchedulerEnabled) {
+        if (configuration.isSchedulerEnabled()) {
           LOGGER.info("[Init] Gobblin Service is running in slave instance mode, not enabling Scheduler.");
         }
         if (helixLeaderGauges.isPresent()) {
@@ -532,14 +433,14 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       LOGGER.info("[Init] Gobblin Service is running in single instance mode, enabling Scheduler.");
       this.scheduler.setActive(true);
 
-      if (this.isGitConfigMonitorEnabled) {
+      if (configuration.isGitConfigMonitorEnabled()) {
         this.gitConfigMonitor.setActive(true);
       }
     }
 
     // Populate TopologyCatalog with all Topologies generated by TopologySpecFactory
     // This has to be done after the topologyCatalog service is launched
-    if (this.isTopologySpecFactoryEnabled) {
+    if (configuration.isTopologySpecFactoryEnabled()) {
       Collection<TopologySpec> topologySpecs = this.topologySpecFactory.getTopologies();
       for (TopologySpec topologySpec : topologySpecs) {
         this.topologyCatalog.put(topologySpec);
@@ -548,7 +449,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
 
     // Register Orchestrator to listen to changes in topology
     // This has to be done after topologySpecFactory has updated spec store, so that listeners will have the latest updates.
-    if (isSchedulerEnabled) {
+    if (configuration.isSchedulerEnabled()) {
       this.topologyCatalog.addListener(this.orchestrator);
     }
 
@@ -560,7 +461,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
 
     //Activate the DagManager service, after the topologyCatalog has been initialized.
     if (!this.helixManager.isPresent() || this.helixManager.get().isLeader()){
-      if (this.isDagManagerEnabled) {
+      if (configuration.isDagManagerEnabled()) {
         this.dagManager.setActive(true);
         this.eventBus.register(this.dagManager);
       }
@@ -592,7 +493,8 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
         this.helixManager.get()
             .getMessagingService()
             .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(),
-                new ControllerUserDefinedMessageHandlerFactory(flowCatalogLocalCommit, scheduler, resourceHandler, serviceName));
+                new ControllerUserDefinedMessageHandlerFactory(flowCatalogLocalCommit, scheduler, resourceHandler,
+                    configuration.getServiceName()));
       }
     } catch (Exception e) {
       LOGGER.error("HelixManager failed to connect", e);
@@ -673,9 +575,15 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
       }
 
       Config config = ConfigFactory.load();
-      try (GobblinServiceManager gobblinServiceManager = new GobblinServiceManager(
-          cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd),
-          config, Optional.<Path>absent())) {
+
+      GobblinServiceConfiguration serviceConfiguration =
+          new GobblinServiceConfiguration(cmd.getOptionValue(SERVICE_NAME_OPTION_NAME), getServiceId(cmd), config,
+              null);
+
+      GobblinServiceGuiceModule guiceModule = new GobblinServiceGuiceModule(serviceConfiguration);
+      Injector injector = Guice.createInjector(guiceModule);
+
+      try (GobblinServiceManager gobblinServiceManager = injector.getInstance(GobblinServiceManager.class)) {
         gobblinServiceManager.start();
 
         if (isTestMode) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 31a7361..821d87e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -51,6 +51,8 @@ import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -116,6 +118,7 @@ import static org.apache.gobblin.service.ExecutionStatus.*;
  */
 @Alpha
 @Slf4j
+@Singleton
 public class DagManager extends AbstractIdleService {
   public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
 
@@ -127,8 +130,6 @@ public class DagManager extends AbstractIdleService {
   private static final Integer TERMINATION_TIMEOUT = 30;
   public static final String NUM_THREADS_KEY = DAG_MANAGER_PREFIX + "numThreads";
   public static final String JOB_STATUS_POLLING_INTERVAL_KEY = DAG_MANAGER_PREFIX + "pollingInterval";
-  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
-  private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
   private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
   private static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
   private static final String FAILED_DAG_RETENTION_TIME_UNIT = FAILED_DAG_STATESTORE_PREFIX + ".retention.timeUnit";
@@ -193,7 +194,7 @@ public class DagManager extends AbstractIdleService {
 
   private volatile boolean isActive = false;
 
-  public DagManager(Config config, boolean instrumentationEnabled) {
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever, boolean instrumentationEnabled) {
     this.config = config;
     this.numThreads = ConfigUtils.getInt(config, NUM_THREADS_KEY, DEFAULT_NUM_THREADS);
     this.queue = initializeDagQueue(this.numThreads);
@@ -216,25 +217,12 @@ public class DagManager extends AbstractIdleService {
       this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
     }
 
-    try {
-      this.jobStatusRetriever = createJobStatusRetriever(config);
-    } catch (ReflectiveOperationException e) {
-      throw new RuntimeException("Exception encountered during DagManager initialization", e);
-    }
+    this.jobStatusRetriever = jobStatusRetriever;
 
     TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
     this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
   }
 
-  JobStatusRetriever createJobStatusRetriever(Config config) throws ReflectiveOperationException {
-    Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
-    return (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
-  }
-
-  KafkaJobStatusMonitor createJobStatusMonitor(Config config) throws ReflectiveOperationException {
-    return new KafkaJobStatusMonitorFactory().createJobStatusMonitor(config);
-  }
-
   DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
     try {
       Class dagStateStoreClass = Class.forName(ConfigUtils.getString(config, DAG_STATESTORE_CLASS_KEY, FSDagStateStore.class.getName()));
@@ -254,8 +242,9 @@ public class DagManager extends AbstractIdleService {
     return queue;
   }
 
-  public DagManager(Config config) {
-    this(config, true);
+  @Inject
+  public DagManager(Config config, JobStatusRetriever jobStatusRetriever) {
+    this(config, jobStatusRetriever, true);
   }
 
   /** Start the service. On startup, the service launches a fixed pool of {@link DagManagerThread}s, which are scheduled at
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index cde02b3..635337c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -39,6 +39,8 @@ import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 
 import javax.annotation.Nonnull;
+import javax.inject.Inject;
+import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.Setter;
 
@@ -67,11 +69,8 @@ import org.apache.gobblin.service.modules.flow.SpecCompiler;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
-import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.service.monitoring.JobStatusRetriever;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
 
 /**
@@ -79,8 +78,8 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * to {@link TopologyCatalog} and updates {@link SpecCompiler} state.
  */
 @Alpha
+@Singleton
 public class Orchestrator implements SpecCatalogListener, Instrumentable {
-  private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = "jobStatusRetriever.class";
 
   protected final Logger _log;
   protected final SpecCompiler specCompiler;
@@ -104,12 +103,15 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
 
   private Map<String, FlowCompiledState> flowGauges = Maps.newHashMap();
 
-  public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
-      boolean instrumentationEnabled) {
+
+  public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
+      Optional<DagManager> dagManager, Optional<Logger> log, boolean instrumentationEnabled) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
+    this.flowStatusGenerator = flowStatusGenerator;
+
     try {
       String specCompilerClassName = ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS;
       if (config.hasPath(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY)) {
@@ -146,26 +148,12 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
         ServiceConfigKeys.DEFAULT_FLOW_CONCURRENCY_ALLOWED);
   }
 
-  public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
-    this(config, topologyCatalog, dagManager, log, true);
-  }
-
-  public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Logger log) {
-    this(config, topologyCatalog, dagManager, Optional.of(log));
+  @Inject
+  public Orchestrator(Config config, FlowStatusGenerator flowStatusGenerator, Optional<TopologyCatalog> topologyCatalog,
+      Optional<DagManager> dagManager, Optional<Logger> log) {
+    this(config, flowStatusGenerator, topologyCatalog, dagManager, log, true);
   }
 
-  public Orchestrator(Config config, Logger log) {
-    this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.of(log));
-  }
-
-  /** Constructor with no logging */
-  public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog) {
-    this(config, topologyCatalog, Optional.<DagManager>absent(), Optional.<Logger>absent());
-  }
-
-  public Orchestrator(Config config) {
-    this(config, Optional.<TopologyCatalog>absent(), Optional.<DagManager>absent(), Optional.<Logger>absent());
-  }
 
   @VisibleForTesting
   public SpecCompiler getSpecCompiler() {
@@ -401,18 +389,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable {
     }
   }
 
-  private FlowStatusGenerator buildFlowStatusGenerator(Config config) {
-    JobStatusRetriever jobStatusRetriever;
-    try {
-      Class jobStatusRetrieverClass = Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, FsJobStatusRetriever.class.getName()));
-      jobStatusRetriever =
-          (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config);
-    } catch (ReflectiveOperationException e) {
-      _log.error("Exception encountered when instantiating JobStatusRetriever");
-      throw new RuntimeException(e);
-    }
-    return FlowStatusGenerator.builder().jobStatusRetriever(jobStatusRetriever).build();
-  }
 
   @Nonnull
   @Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 12ce857..f1b6e0a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -35,6 +35,8 @@ import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
 import com.linkedin.restli.server.util.PatchApplier;
 
+import javax.inject.Inject;
+import javax.inject.Named;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -48,6 +50,7 @@ import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
 
 
 /**
@@ -65,11 +68,13 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
   private GobblinServiceJobScheduler jobScheduler;
   private boolean forceLeader;
 
-  public GobblinServiceFlowConfigResourceHandler(String serviceName, boolean flowCatalogLocalCommit,
+  @Inject
+  public GobblinServiceFlowConfigResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
+      @Named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT) boolean flowCatalogLocalCommit,
       FlowConfigResourceLocalHandler handler,
       Optional<HelixManager> manager,
       GobblinServiceJobScheduler jobScheduler,
-      boolean forceLeader) {
+      @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
     this.flowCatalogLocalCommit = flowCatalogLocalCommit;
     this.serviceName = serviceName;
     this.localHandler = handler;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
new file mode 100644
index 0000000..74ed9a0
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigV2ResourceHandler.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.restli;
+
+import org.apache.helix.HelixManager;
+
+import com.google.common.base.Optional;
+
+import javax.inject.Inject;
+import javax.inject.Named;
+
+import org.apache.gobblin.service.FlowConfigV2ResourceLocalHandler;
+import org.apache.gobblin.service.FlowConfigsV2ResourceHandler;
+import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
+
+
+public class GobblinServiceFlowConfigV2ResourceHandler extends GobblinServiceFlowConfigResourceHandler
+    implements FlowConfigsV2ResourceHandler {
+
+  @Inject
+  public GobblinServiceFlowConfigV2ResourceHandler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
+      @Named(InjectionNames.FLOW_CATALOG_LOCAL_COMMIT) boolean flowCatalogLocalCommit,
+      FlowConfigV2ResourceLocalHandler handler, Optional<HelixManager> manager, GobblinServiceJobScheduler jobScheduler,
+      @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
+    super(serviceName, flowCatalogLocalCommit, handler, manager, jobScheduler, forceLeader);
+  }
+}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
index 538fc57..c2d845d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowExecutionResourceHandler.java
@@ -29,13 +29,18 @@ import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.UpdateResponse;
 
+import javax.inject.Inject;
+import javax.inject.Named;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.FlowExecution;
 import org.apache.gobblin.service.FlowExecutionResourceHandler;
+import org.apache.gobblin.service.FlowExecutionResourceLocalHandler;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.FlowStatusId;
+import org.apache.gobblin.service.modules.core.GobblinServiceManager;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
 import org.apache.gobblin.service.monitoring.KillFlowEvent;
 import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
 
@@ -47,13 +52,15 @@ import org.apache.gobblin.service.monitoring.ResumeFlowEvent;
  */
 @Slf4j
 public class GobblinServiceFlowExecutionResourceHandler implements FlowExecutionResourceHandler {
-  private FlowExecutionResourceHandler localHandler;
+  private FlowExecutionResourceLocalHandler localHandler;
   private EventBus eventBus;
   private Optional<HelixManager> helixManager;
   private boolean forceLeader;
 
-  public GobblinServiceFlowExecutionResourceHandler(FlowExecutionResourceHandler handler, EventBus eventBus,
-      Optional<HelixManager> manager, boolean forceLeader) {
+  @Inject
+  public GobblinServiceFlowExecutionResourceHandler(FlowExecutionResourceLocalHandler handler,
+      @Named(GobblinServiceManager.SERVICE_EVENT_BUS_NAME) EventBus eventBus,
+      Optional<HelixManager> manager, @Named(InjectionNames.FORCE_LEADER) boolean forceLeader) {
     this.localHandler = handler;
     this.eventBus = eventBus;
     this.helixManager = manager;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index d4cadc6..1c6d89a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -40,6 +40,9 @@ import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import javax.inject.Inject;
+import javax.inject.Named;
+import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -61,6 +64,8 @@ import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.utils.InjectionNames;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
@@ -73,6 +78,7 @@ import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFI
  * and runs them via {@link Orchestrator}.
  */
 @Alpha
+@Singleton
 public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {
 
   // Scheduler related configuration
@@ -105,9 +111,10 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
    */
   public static final String DR_FILTER_TAG = "dr";
 
-  public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager,
-      Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
-      SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+  @Inject
+  public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config,
+      Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
+      Orchestrator orchestrator, SchedulerService schedulerService, Optional<Logger> log) throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -120,11 +127,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
   }
 
-  public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager,
+  public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
+      Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
-      SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+      SchedulerService schedulerService,  Optional<Logger> log) throws Exception {
     this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
-        new Orchestrator(config, topologyCatalog, dagManager, log), schedulerService, log);
+        new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, log);
   }
 
   public synchronized void setActive(boolean isActive) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
index d1bb267..670641e 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/topology/ConfigBasedTopologySpecFactory.java
@@ -31,16 +31,20 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
+import javax.inject.Inject;
+import javax.inject.Singleton;
+
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.runtime.api.SpecExecutor;
 
 
 
 @Alpha
+@Singleton
 public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
 
   private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults();
@@ -52,6 +56,7 @@ public class ConfigBasedTopologySpecFactory implements TopologySpecFactory {
     this(config, Optional.<Logger>absent());
   }
 
+  @Inject
   public ConfigBasedTopologySpecFactory(Config config, Optional<Logger> log) {
     Preconditions.checkNotNull(config, "Config should not be null");
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
similarity index 65%
copy from gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
copy to gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
index 639c6a1..ebe6142 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/NoopGroupOwnershipService.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/utils/InjectionNames.java
@@ -14,20 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gobblin.service;
 
-import com.typesafe.config.Config;
-import java.util.List;
-import org.apache.gobblin.annotation.Alias;
+package org.apache.gobblin.service.modules.utils;
 
-
-@Alias("noop")
-public class NoopGroupOwnershipService extends GroupOwnershipService{
-
-   public NoopGroupOwnershipService(Config config) {
-   }
-
-   public boolean isMemberOfGroup(List<ServiceRequester> serviceRequesters, String group) {
-      return true;
-   }
+/**
+ * These names are used for dependency injection, when we need to inject different instances of the same type,
+ * or inject constants.
+ * */
+public final class InjectionNames {
+  public static final String SERVICE_NAME = "serviceName";
+  public static final String FORCE_LEADER = "forceLeader";
+  public static final String FLOW_CATALOG_LOCAL_COMMIT = "flowCatalogLocalCommit";
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
index fba549f..85ee51a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/FsJobStatusRetriever.java
@@ -25,15 +25,14 @@ import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
-import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.typesafe.config.Config;
 
+import javax.inject.Inject;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -41,7 +40,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStore;
 import org.apache.gobblin.metastore.FileContextBasedFsStateStoreFactory;
 import org.apache.gobblin.metastore.FsStateStore;
-import org.apache.gobblin.metrics.event.TimingEvent;
 
 
 /**
@@ -56,6 +54,7 @@ public class FsJobStatusRetriever extends JobStatusRetriever {
   @Getter
   private final FileContextBasedFsStateStore<State> stateStore;
 
+  @Inject
   public FsJobStatusRetriever(Config config) {
     this.stateStore = (FileContextBasedFsStateStore<State>) new FileContextBasedFsStateStoreFactory().
         createStateStore(config.getConfig(CONF_PREFIX), State.class);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
index 6e9b64b..741a246 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitorFactory.java
@@ -21,6 +21,8 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import javax.inject.Inject;
+import javax.inject.Provider;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
@@ -34,11 +36,18 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
  * A factory implementation that returns a {@link KafkaJobStatusMonitor} instance.
  */
 @Slf4j
-public class KafkaJobStatusMonitorFactory {
+public class KafkaJobStatusMonitorFactory implements Provider<KafkaJobStatusMonitor> {
   private static final String KAFKA_SSL_CONFIG_PREFIX_KEY = "jobStatusMonitor.kafka.config";
   private static final String DEFAULT_KAFKA_SSL_CONFIG_PREFIX = "metrics.reporting.kafka.config";
 
-  public KafkaJobStatusMonitor createJobStatusMonitor(Config config)
+  Config config;
+
+  @Inject
+  public KafkaJobStatusMonitorFactory(Config config) {
+    this.config = config;
+  }
+
+  private KafkaJobStatusMonitor createJobStatusMonitor()
       throws ReflectiveOperationException {
     Config jobStatusConfig = config.getConfig(KafkaJobStatusMonitor.JOB_STATUS_MONITOR_PREFIX);
 
@@ -64,4 +73,13 @@ public class KafkaJobStatusMonitorFactory {
     jobStatusConfig = jobStatusConfig.withFallback(kafkaSslConfig).withFallback(schemaRegistryConfig);
     return (KafkaJobStatusMonitor) GobblinConstructorUtils.invokeLongestConstructor(jobStatusMonitorClass, topic, jobStatusConfig, numThreads);
   }
+
+  @Override
+  public KafkaJobStatusMonitor get() {
+    try {
+      return createJobStatusMonitor();
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index f753e8c..ef30050 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -41,7 +41,6 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.gson.Gson;
@@ -49,13 +48,11 @@ import com.google.gson.GsonBuilder;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
 import com.linkedin.restli.client.RestLiResponseException;
-import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
-import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -101,7 +98,7 @@ public class GobblinServiceManagerTest {
 
   private final URI TEST_URI = FlowSpec.Utils.createFlowSpecUri(TEST_FLOW_ID);
 
-  private MockGobblinServiceManager gobblinServiceManager;
+  private GobblinServiceManager gobblinServiceManager;
   private FlowConfigV2Client flowConfigClient;
 
   private Git gitForPush;
@@ -168,12 +165,12 @@ public class GobblinServiceManagerTest {
     this.gitForPush.commit().setMessage("First commit").call();
     this.gitForPush.push().setRemote("origin").setRefSpecs(new RefSpec("master")).call();
 
-    this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
-        ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
+    this.gobblinServiceManager = GobblinServiceManager.create("CoreService", "1",
+        ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(SERVICE_WORK_DIR));
     this.gobblinServiceManager.start();
 
     this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
-        this.gobblinServiceManager.getRestLiServer().getListeningURI().getPort()), transportClientProperties);
+        this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), transportClientProperties);
   }
 
   private void cleanUpDir(String dir) throws Exception {
@@ -508,23 +505,10 @@ null, null, null, null);
 
   private void serviceReboot() throws Exception {
     this.gobblinServiceManager.stop();
-    this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
-        ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
-    this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
-        this.gobblinServiceManager.getRestLiServer().getPort()), transportClientProperties);
+    this.gobblinServiceManager = GobblinServiceManager.create("CoreService", "1",
+        ConfigUtils.propertiesToConfig(serviceCoreProperties), new Path(SERVICE_WORK_DIR));
     this.gobblinServiceManager.start();
-  }
-
-  class MockGobblinServiceManager extends GobblinServiceManager {
-
-    public MockGobblinServiceManager(String serviceName, String serviceId, Config config,
-        Optional<Path> serviceWorkDirOptional)
-        throws Exception {
-      super(serviceName, serviceId, config, serviceWorkDirOptional);
-    }
-
-    protected EmbeddedRestliServer getRestLiServer() {
-      return this.restliServer;
-    }
+    this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
+        this.gobblinServiceManager.getRestLiServerListeningURI().getPort()), transportClientProperties);
   }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
index cd10a82..4263b31 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java
@@ -25,9 +25,6 @@ import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.hadoop.fs.Path;
 import org.eclipse.jetty.http.HttpStatus;
-import org.jetbrains.annotations.Nullable;
-import org.mockito.Mockito;
-import org.mockito.exceptions.base.MockitoAssertionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -35,13 +32,10 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
-import com.google.common.eventbus.EventBus;
 import com.linkedin.data.template.StringMap;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
 import com.linkedin.restli.client.RestLiResponseException;
-import com.typesafe.config.Config;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlJobStatusStateStoreFactory;
@@ -53,12 +47,11 @@ import org.apache.gobblin.service.FlowConfigClient;
 import org.apache.gobblin.service.FlowId;
 import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.service.ServiceConfigKeys;
-import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.utils.HelixUtils;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
-import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
+
 @Test
 public class GobblinServiceHATest {
 
@@ -148,6 +141,7 @@ public class GobblinServiceHATest {
     commonServiceCoreProperties.put("zookeeper.connect", testingZKServer.getConnectString());
     commonServiceCoreProperties.put(ConfigurationKeys.STATE_STORE_FACTORY_CLASS_KEY, MysqlJobStatusStateStoreFactory.class.getName());
     commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
+    commonServiceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY, false);
 
     Properties node1ServiceCoreProperties = new Properties();
     node1ServiceCoreProperties.putAll(commonServiceCoreProperties);
@@ -166,13 +160,13 @@ public class GobblinServiceHATest {
     node2ServiceCoreProperties.put(QUARTZ_THREAD_POOL_COUNT, 3);
 
     // Start Node 1
-    this.node1GobblinServiceManager = new TestGobblinServiceManager("CoreService1", "1",
-        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
+    this.node1GobblinServiceManager = GobblinServiceManager.create("CoreService1", "1",
+        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new Path(NODE_1_SERVICE_WORK_DIR));
     this.node1GobblinServiceManager.start();
 
     // Start Node 2
-    this.node2GobblinServiceManager = new TestGobblinServiceManager("CoreService2", "2",
-        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
+    this.node2GobblinServiceManager = GobblinServiceManager.create("CoreService2", "2",
+        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new Path(NODE_2_SERVICE_WORK_DIR));
     this.node2GobblinServiceManager.start();
 
     // Initialize Node 1 Client
@@ -549,30 +543,7 @@ public class GobblinServiceHATest {
     logger.info("Total failover time in ms: " + (failOverEndTime - failOverStartTime));
 
     Assert.assertTrue(assertSuccess, "New master should take over all old master jobs.");
-
-    // Check eventbus was registered with new leader
-    AssertWithBackoff assertWithBackoff = AssertWithBackoff.create().logger(LoggerFactory.getLogger("checkEventbusRegistered")).timeoutMs(20000);
-    assertWithBackoff.assertTrue(new com.google.common.base.Predicate<Void>() {
-      @Override
-      public boolean apply(@Nullable Void input) {
-        try {
-          Mockito.verify(secondary.getEventBus(), Mockito.atLeastOnce()).register(secondary.getDagManager());
-          return true;
-        } catch (MockitoAssertionError e) {
-          return false;
-        }
-      }
-    }, "Checking eventBus was registered");
-
     logger.info("+++++++++++++++++++ testKillNode END");
   }
 
-  public class TestGobblinServiceManager extends GobblinServiceManager {
-    public TestGobblinServiceManager(String serviceName, String serviceId, Config config, Optional<Path> serviceWorkDirOptional) throws Exception {
-      super(serviceName, serviceId, config, serviceWorkDirOptional);
-      this.isDagManagerEnabled = true;
-      this.eventBus = Mockito.mock(EventBus.class);
-      this.dagManager = Mockito.mock(DagManager.class);
-    }
-  }
 }
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
index ce4cf71..003e318 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceRedirectTest.java
@@ -31,7 +31,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.linkedin.data.template.StringMap;
@@ -167,13 +166,13 @@ public class GobblinServiceRedirectTest {
     node2ServiceCoreProperties.put(ServiceConfigKeys.SERVICE_PORT, port2);
 
     // Start Node 1
-    this.node1GobblinServiceManager = new GobblinServiceManager("RedirectCoreService1", "1",
-        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), Optional.of(new Path(NODE_1_SERVICE_WORK_DIR)));
+    this.node1GobblinServiceManager = GobblinServiceManager.create("RedirectCoreService1", "1",
+        ConfigUtils.propertiesToConfig(node1ServiceCoreProperties), new Path(NODE_1_SERVICE_WORK_DIR));
     this.node1GobblinServiceManager.start();
 
     // Start Node 2
-    this.node2GobblinServiceManager = new GobblinServiceManager("RedirectCoreService2", "2",
-        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), Optional.of(new Path(NODE_2_SERVICE_WORK_DIR)));
+    this.node2GobblinServiceManager = GobblinServiceManager.create("RedirectCoreService2", "2",
+        ConfigUtils.propertiesToConfig(node2ServiceCoreProperties), new Path(NODE_2_SERVICE_WORK_DIR));
     this.node2GobblinServiceManager.start();
 
     // Initialize Node 1 Client
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
index 67ade43..92810e0 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerFlowTest.java
@@ -297,11 +297,10 @@ class CancelPredicate implements Predicate<Void> {
 class MockedDagManager extends DagManager {
 
   public MockedDagManager(Config config, boolean instrumentationEnabled) {
-    super(config, instrumentationEnabled);
+    super(config, createJobStatusRetriever(), instrumentationEnabled);
   }
 
-  @Override
-  JobStatusRetriever createJobStatusRetriever(Config config) {
+  private static JobStatusRetriever createJobStatusRetriever() {
     JobStatusRetriever mockedJbStatusRetriever = Mockito.mock(JobStatusRetriever.class);
     Mockito.doReturn(Collections.emptyIterator()).when(mockedJbStatusRetriever).
         getJobStatusesForFlowExecution(anyString(), anyString(), anyLong(), anyString(), anyString());
@@ -309,11 +308,6 @@ class MockedDagManager extends DagManager {
   }
 
   @Override
-  KafkaJobStatusMonitor createJobStatusMonitor(Config config) {
-    return null;
-  }
-
-  @Override
   DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topologySpecMap) {
     DagStateStore mockedDagStateStore = Mockito.mock(DagStateStore.class);
 
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 7087c1e..689bfa2 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.service.modules.orchestration;
 
-import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.runtime.api.SpecExecutor;
-import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
-import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import java.io.File;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -29,6 +25,8 @@ import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.io.FileUtils;
+
+import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -45,11 +43,15 @@ import com.typesafe.config.Config;
 
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
 import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
+import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 
@@ -108,6 +110,7 @@ public class OrchestratorTest {
     this.serviceLauncher.addService(flowCatalog);
 
     this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
+        mock(FlowStatusGenerator.class),
         Optional.of(this.topologyCatalog), Optional.<DagManager>absent(), Optional.of(logger));
     this.topologyCatalog.addListener(orchestrator);
     this.flowCatalog.addListener(orchestrator);
diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle
index 7de8178..e37d324 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -113,6 +113,7 @@ ext.externalDependency = [
     "mysqlConnector": "mysql:mysql-connector-java:8.0.24",
     "javaxInject": "javax.inject:javax.inject:1",
     "guice": "com.google.inject:guice:4.0",
+    "guiceMultibindings": "com.google.inject.extensions:guice-multibindings:4.0",
     "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
     "derby": "org.apache.derby:derby:10.12.1.1",
     "mockito": "org.mockito:mockito-core:1.10.19",