You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/16 22:05:26 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in cluster mode

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aa9e6e  [GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in cluster mode
9aa9e6e is described below

commit 9aa9e6e67ab4479cc521bb95577a85c86727e533
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Sep 16 15:05:17 2019 -0700

    [GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in cluster mode
    
    Closes #2729 from sv2000/metrics
---
 .../apache/gobblin/aws/GobblinAWSTaskRunner.java   |   2 +-
 .../gobblin/cluster/ContainerHealthMetrics.java    |  34 ++++++
 .../cluster/ContainerHealthMetricsService.java     | 134 +++++++++++++++++++++
 .../cluster/GobblinClusterConfigurationKeys.java   |   2 +
 .../gobblin/cluster/GobblinClusterManager.java     |   5 +
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |   8 +-
 .../cluster/ContainerHealthMetricsServiceTest.java |  39 ++++++
 .../gobblin/yarn/GobblinApplicationMaster.java     |   5 +-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   3 +
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |   7 +-
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  12 +-
 11 files changed, 243 insertions(+), 8 deletions(-)

diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
index cd2045f..4bff1ff 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
@@ -84,7 +84,7 @@ public class GobblinAWSTaskRunner extends GobblinTaskRunner {
 
   @Override
   public List<Service> getServices() {
-    return Collections.emptyList();
+    return super.getServices();
   }
 
   @Override
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
new file mode 100644
index 0000000..2bf187c
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+public class ContainerHealthMetrics {
+  public static final String CONTAINER_METRICS_PREFIX = "container.health.metrics.";
+
+  public static final String PROCESS_CPU_LOAD = CONTAINER_METRICS_PREFIX + "processCpuLoad";
+  public static final String PROCESS_CPU_TIME = CONTAINER_METRICS_PREFIX + "processCpuTime";
+  public static final String PROCESS_HEAP_USED_SIZE = CONTAINER_METRICS_PREFIX + "processHeapUsedSize";
+  public static final String SYSTEM_CPU_LOAD = CONTAINER_METRICS_PREFIX + "systemCpuLoad";
+  public static final String SYSTEM_LOAD_AVG = CONTAINER_METRICS_PREFIX + "systemLoadAvg";
+  public static final String COMMITTED_VMEM_SIZE = CONTAINER_METRICS_PREFIX + "committedVmemSize";
+  public static final String FREE_SWAP_SPACE_SIZE = CONTAINER_METRICS_PREFIX + "freeSwapSpaceSize";
+  public static final String TOTAL_SWAP_SPACE_SIZE = CONTAINER_METRICS_PREFIX + "totalSwapSpaceSize";
+  public static final String NUM_AVAILABLE_PROCESSORS = CONTAINER_METRICS_PREFIX + "numAvailableProcessors";
+  public static final String TOTAL_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "totalPhysicalMemSize";
+  public static final String FREE_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "freePhysicalMemSize";
+
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
new file mode 100644
index 0000000..b043857
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.sun.management.OperatingSystemMXBean;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A utility class that periodically emits system level metrics that report the health of the container.
+ * Reported metrics include CPU/Memory usage of the JVM, system load etc.
+ *
+ * <p>
+ *   This class extends the {@link AbstractScheduledService} so it can be used with a
+ *   {@link com.google.common.util.concurrent.ServiceManager} that manages the lifecycle of
+ *   a {@link ContainerHealthMetricsService}.
+ * </p>
+ * TODO: Add Garbage Collection metrics.
+*/
+public class ContainerHealthMetricsService extends AbstractScheduledService {
+  //Container metrics service configurations
+  private static final String CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS = "container.health.metrics.service.reportingIntervalSeconds";
+  private static final Long DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL = 30L;
+
+  private final long metricReportingInterval;
+  private final OperatingSystemMXBean operatingSystemMXBean;
+  private final MemoryMXBean memoryMXBean;
+
+  AtomicDouble processCpuLoad = new AtomicDouble(0);
+  AtomicDouble systemCpuLoad = new AtomicDouble(0);
+  AtomicDouble systemLoadAvg = new AtomicDouble(0);
+  AtomicLong committedVmemSize = new AtomicLong(0);
+  AtomicLong processCpuTime = new AtomicLong(0);
+  AtomicLong freeSwapSpaceSize = new AtomicLong(0);
+  AtomicLong numAvailableProcessors = new AtomicLong(0);
+  AtomicLong totalPhysicalMemSize = new AtomicLong(0);
+  AtomicLong totalSwapSpaceSize = new AtomicLong(0);
+  AtomicLong freePhysicalMemSize = new AtomicLong(0);
+  AtomicLong processHeapUsedSize = new AtomicLong(0);
+
+  public ContainerHealthMetricsService(Config config) {
+    this.metricReportingInterval = ConfigUtils.getLong(config, CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS, DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL);
+    this.operatingSystemMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
+    this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+    //Build all the gauges and register them with the metrics registry.
+    List<ContextAwareGauge<Double>> systemMetrics = buildGaugeList();
+    systemMetrics.forEach(metric -> RootMetricContext.get().register(metric));
+  }
+
+  /**
+   * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
+   * the service will transition to the {@link com.google.common.util.concurrent.Service.State#FAILED} state and this method will no
+   * longer be called.
+   */
+  @Override
+  protected void runOneIteration() throws Exception {
+    this.processCpuLoad.set(this.operatingSystemMXBean.getProcessCpuLoad());
+    this.systemCpuLoad.set(this.operatingSystemMXBean.getSystemCpuLoad());
+    this.systemLoadAvg.set(this.operatingSystemMXBean.getSystemLoadAverage());
+    this.committedVmemSize.set(this.operatingSystemMXBean.getCommittedVirtualMemorySize());
+    this.processCpuTime.set(this.operatingSystemMXBean.getProcessCpuTime());
+    this.freeSwapSpaceSize.set(this.operatingSystemMXBean.getFreeSwapSpaceSize());
+    this.numAvailableProcessors.set(this.operatingSystemMXBean.getAvailableProcessors());
+    this.totalPhysicalMemSize.set(this.operatingSystemMXBean.getTotalPhysicalMemorySize());
+    this.totalSwapSpaceSize.set(this.operatingSystemMXBean.getTotalSwapSpaceSize());
+    this.freePhysicalMemSize.set(this.operatingSystemMXBean.getFreePhysicalMemorySize());
+    this.processHeapUsedSize.set(this.memoryMXBean.getHeapMemoryUsage().getUsed());
+  }
+
+  protected List<ContextAwareGauge<Double>> buildGaugeList() {
+    List<ContextAwareGauge<Double>> gaugeList = new ArrayList<>();
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD,
+        () -> this.processCpuLoad.get()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD,
+        () -> this.systemCpuLoad.get()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG,
+        () -> this.systemLoadAvg.get()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE,
+        () -> Long.valueOf(this.committedVmemSize.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_TIME,
+        () -> Long.valueOf(this.processCpuTime.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE,
+        () -> Long.valueOf(this.freeSwapSpaceSize.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS,
+        () -> Long.valueOf(this.numAvailableProcessors.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE,
+        () -> Long.valueOf(this.totalPhysicalMemSize.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE,
+        () -> Long.valueOf(this.totalSwapSpaceSize.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE,
+        () -> Long.valueOf(this.freePhysicalMemSize.get()).doubleValue()));
+    gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE,
+        () -> Long.valueOf(this.processHeapUsedSize.get()).doubleValue()));
+    return gaugeList;
+  }
+
+  /**
+   * Returns the {@link Scheduler} object used to configure this service.  This method will only be
+   * called once.
+   */
+  @Override
+  protected Scheduler scheduler() {
+    return Scheduler.newFixedRateSchedule(0, this.metricReportingInterval, TimeUnit.SECONDS);
+  }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 15c197f..4b21f4e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -167,4 +167,6 @@ public class GobblinClusterConfigurationKeys {
 
   public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
   public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 300;
+  public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
+  public static final boolean DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = false;
 }
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c241c9b..2826720 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -191,6 +191,11 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
     this.applicationLauncher.addService(this.jobScheduler);
     this.jobConfigurationManager = buildJobConfigurationManager(config);
     this.applicationLauncher.addService(this.jobConfigurationManager);
+
+    if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+        GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
+      this.applicationLauncher.addService(new ContainerHealthMetricsService(config));
+    }
   }
 
   /**
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index fd17216..aad3908 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -196,6 +196,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
     this.services.addAll(suite.getServices());
 
     this.services.addAll(getServices());
+
     if (this.services.isEmpty()) {
       this.serviceManager = null;
     } else {
@@ -335,7 +336,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
    * @return a {@link List} of additional {@link Service}s to run.
    */
   protected List<Service> getServices() {
-    return new ArrayList<>();
+    List<Service> serviceList = new ArrayList<>();
+    if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+        GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
+      serviceList.add(new ContainerHealthMetricsService(config));
+    }
+    return serviceList;
   }
 
   @VisibleForTesting
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
new file mode 100644
index 0000000..ac67c01
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+public class ContainerHealthMetricsServiceTest {
+
+  @Test
+  public void testRunOneIteration() throws Exception {
+    Config config = ConfigFactory.empty();
+    ContainerHealthMetricsService service = new ContainerHealthMetricsService(config);
+    service.runOneIteration();
+    long processCpuTime1 = service.processCpuTime.get();
+    Thread.sleep(10);
+    service.runOneIteration();
+    long processCpuTime2 = service.processCpuTime.get();
+    Assert.assertTrue( processCpuTime1 <= processCpuTime2);
+  }
+}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index acf31b1..282156d 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -47,6 +47,7 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Getter;
 
@@ -82,7 +83,9 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
   public GobblinApplicationMaster(String applicationName, ContainerId containerId, Config config,
       YarnConfiguration yarnConfiguration) throws Exception {
     super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
-        GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
+        GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+            ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+        Optional.<Path>absent());
 
     String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
     GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 10ae50c..882b1d5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -105,4 +105,7 @@ public class GobblinYarnConfigurationKeys {
   //Constant definitions
   public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE = "log4j-yarn.properties";
   public static final String JVM_USER_TIMEZONE_CONFIG = "user.timezone";
+
+  //Configuration properties relating to container mode of execution e.g. Gobblin cluster runs on Yarn
+  public static final String CONTAINER_NUM_KEY = "container.num";
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index c90306c..60e4a76 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -41,6 +41,7 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.Service;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
 import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
 import org.apache.gobblin.cluster.GobblinClusterUtils;
@@ -53,18 +54,18 @@ import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
 public class GobblinYarnTaskRunner extends GobblinTaskRunner {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
-  private final ContainerId containerId;
 
   public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
     super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
-        GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
-    this.containerId = containerId;
+        GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+            ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))), appWorkDirOptional);
   }
 
   @Override
   public List<Service> getServices() {
     List<Service> services = new ArrayList<>();
+    services.addAll(super.getServices());
     if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
       LOGGER.info("Adding YarnContainerSecurityManager since login is keytab based");
       services.add(new YarnContainerSecurityManager(this.config, this.fs, this.eventBus));
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index c88d7fe..7b7f8f5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.yarn;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Map;
 
@@ -133,4 +131,14 @@ public class YarnHelixUtils {
 
     return environmentVariableMap;
   }
+
+  /**
+   * Return the identifier of the containerId. The identifier is the substring in the containerId representing
+   * the sequential number of the container.
+   * @param containerId e.g. "container_e94_1567552810874_2132400_01_000001"
+   * @return sequence number of the containerId e.g. "container-000001"
+   */
+  public static String getContainerNum(String containerId) {
+    return "container-" + containerId.substring(containerId.lastIndexOf("_") + 1);
+  }
 }