You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/09/16 22:05:26 UTC
[incubator-gobblin] branch master updated:
[GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in
cluster mode
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9aa9e6e [GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in cluster mode
9aa9e6e is described below
commit 9aa9e6e67ab4479cc521bb95577a85c86727e533
Author: sv2000 <su...@gmail.com>
AuthorDate: Mon Sep 16 15:05:17 2019 -0700
[GOBBLIN-875][GOBBLIN-685] Emit container health metrics when running in cluster mode
Closes #2729 from sv2000/metrics
---
.../apache/gobblin/aws/GobblinAWSTaskRunner.java | 2 +-
.../gobblin/cluster/ContainerHealthMetrics.java | 34 ++++++
.../cluster/ContainerHealthMetricsService.java | 134 +++++++++++++++++++++
.../cluster/GobblinClusterConfigurationKeys.java | 2 +
.../gobblin/cluster/GobblinClusterManager.java | 5 +
.../apache/gobblin/cluster/GobblinTaskRunner.java | 8 +-
.../cluster/ContainerHealthMetricsServiceTest.java | 39 ++++++
.../gobblin/yarn/GobblinApplicationMaster.java | 5 +-
.../gobblin/yarn/GobblinYarnConfigurationKeys.java | 3 +
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 7 +-
.../org/apache/gobblin/yarn/YarnHelixUtils.java | 12 +-
11 files changed, 243 insertions(+), 8 deletions(-)
diff --git a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
index cd2045f..4bff1ff 100644
--- a/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
+++ b/gobblin-aws/src/main/java/org/apache/gobblin/aws/GobblinAWSTaskRunner.java
@@ -84,7 +84,7 @@ public class GobblinAWSTaskRunner extends GobblinTaskRunner {
@Override
public List<Service> getServices() {
- return Collections.emptyList();
+ return super.getServices();
}
@Override
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
new file mode 100644
index 0000000..2bf187c
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetrics.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+public class ContainerHealthMetrics {
+ public static final String CONTAINER_METRICS_PREFIX = "container.health.metrics.";
+
+ public static final String PROCESS_CPU_LOAD = CONTAINER_METRICS_PREFIX + "processCpuLoad";
+ public static final String PROCESS_CPU_TIME = CONTAINER_METRICS_PREFIX + "processCpuTime";
+ public static final String PROCESS_HEAP_USED_SIZE = CONTAINER_METRICS_PREFIX + "processHeapUsedSize";
+ public static final String SYSTEM_CPU_LOAD = CONTAINER_METRICS_PREFIX + "systemCpuLoad";
+ public static final String SYSTEM_LOAD_AVG = CONTAINER_METRICS_PREFIX + "systemLoadAvg";
+ public static final String COMMITTED_VMEM_SIZE = CONTAINER_METRICS_PREFIX + "committedVmemSize";
+ public static final String FREE_SWAP_SPACE_SIZE = CONTAINER_METRICS_PREFIX + "freeSwapSpaceSize";
+ public static final String TOTAL_SWAP_SPACE_SIZE = CONTAINER_METRICS_PREFIX + "totalSwapSpaceSize";
+ public static final String NUM_AVAILABLE_PROCESSORS = CONTAINER_METRICS_PREFIX + "numAvailableProcessors";
+ public static final String TOTAL_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "totalPhysicalMemSize";
+ public static final String FREE_PHYSICAL_MEM_SIZE = CONTAINER_METRICS_PREFIX + "freePhysicalMemSize";
+
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
new file mode 100644
index 0000000..b043857
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ContainerHealthMetricsService.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.AtomicDouble;
+import com.sun.management.OperatingSystemMXBean;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.metrics.ContextAwareGauge;
+import org.apache.gobblin.metrics.RootMetricContext;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * A utility class that periodically emits system level metrics that report the health of the container.
+ * Reported metrics include CPU/Memory usage of the JVM, system load etc.
+ *
+ * <p>
+ * This class extends the {@link AbstractScheduledService} so it can be used with a
+ * {@link com.google.common.util.concurrent.ServiceManager} that manages the lifecycle of
+ * a {@link ContainerHealthMetricsService}.
+ * </p>
+ * TODO: Add Garbage Collection metrics.
+*/
+public class ContainerHealthMetricsService extends AbstractScheduledService {
+ //Container metrics service configurations
+ private static final String CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS = "container.health.metrics.service.reportingIntervalSeconds";
+ private static final Long DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL = 30L;
+
+ private final long metricReportingInterval;
+ private final OperatingSystemMXBean operatingSystemMXBean;
+ private final MemoryMXBean memoryMXBean;
+
+ AtomicDouble processCpuLoad = new AtomicDouble(0);
+ AtomicDouble systemCpuLoad = new AtomicDouble(0);
+ AtomicDouble systemLoadAvg = new AtomicDouble(0);
+ AtomicLong committedVmemSize = new AtomicLong(0);
+ AtomicLong processCpuTime = new AtomicLong(0);
+ AtomicLong freeSwapSpaceSize = new AtomicLong(0);
+ AtomicLong numAvailableProcessors = new AtomicLong(0);
+ AtomicLong totalPhysicalMemSize = new AtomicLong(0);
+ AtomicLong totalSwapSpaceSize = new AtomicLong(0);
+ AtomicLong freePhysicalMemSize = new AtomicLong(0);
+ AtomicLong processHeapUsedSize = new AtomicLong(0);
+
+ public ContainerHealthMetricsService(Config config) {
+ this.metricReportingInterval = ConfigUtils.getLong(config, CONTAINER_METRICS_SERVICE_REPORTING_INTERVAL_SECONDS, DEFAULT_CONTAINER_METRICS_REPORTING_INTERVAL);
+ this.operatingSystemMXBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
+ this.memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ //Build all the gauges and register them with the metrics registry.
+ List<ContextAwareGauge<Double>> systemMetrics = buildGaugeList();
+ systemMetrics.forEach(metric -> RootMetricContext.get().register(metric));
+ }
+
+ /**
+ * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
+ * the service will transition to the {@link com.google.common.util.concurrent.Service.State#FAILED} state and this method will no
+ * longer be called.
+ */
+ @Override
+ protected void runOneIteration() throws Exception {
+ this.processCpuLoad.set(this.operatingSystemMXBean.getProcessCpuLoad());
+ this.systemCpuLoad.set(this.operatingSystemMXBean.getSystemCpuLoad());
+ this.systemLoadAvg.set(this.operatingSystemMXBean.getSystemLoadAverage());
+ this.committedVmemSize.set(this.operatingSystemMXBean.getCommittedVirtualMemorySize());
+ this.processCpuTime.set(this.operatingSystemMXBean.getProcessCpuTime());
+ this.freeSwapSpaceSize.set(this.operatingSystemMXBean.getFreeSwapSpaceSize());
+ this.numAvailableProcessors.set(this.operatingSystemMXBean.getAvailableProcessors());
+ this.totalPhysicalMemSize.set(this.operatingSystemMXBean.getTotalPhysicalMemorySize());
+ this.totalSwapSpaceSize.set(this.operatingSystemMXBean.getTotalSwapSpaceSize());
+ this.freePhysicalMemSize.set(this.operatingSystemMXBean.getFreePhysicalMemorySize());
+ this.processHeapUsedSize.set(this.memoryMXBean.getHeapMemoryUsage().getUsed());
+ }
+
+ protected List<ContextAwareGauge<Double>> buildGaugeList() {
+ List<ContextAwareGauge<Double>> gaugeList = new ArrayList<>();
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_LOAD,
+ () -> this.processCpuLoad.get()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_CPU_LOAD,
+ () -> this.systemCpuLoad.get()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.SYSTEM_LOAD_AVG,
+ () -> this.systemLoadAvg.get()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.COMMITTED_VMEM_SIZE,
+ () -> Long.valueOf(this.committedVmemSize.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_CPU_TIME,
+ () -> Long.valueOf(this.processCpuTime.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_SWAP_SPACE_SIZE,
+ () -> Long.valueOf(this.freeSwapSpaceSize.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.NUM_AVAILABLE_PROCESSORS,
+ () -> Long.valueOf(this.numAvailableProcessors.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_PHYSICAL_MEM_SIZE,
+ () -> Long.valueOf(this.totalPhysicalMemSize.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.TOTAL_SWAP_SPACE_SIZE,
+ () -> Long.valueOf(this.totalSwapSpaceSize.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.FREE_PHYSICAL_MEM_SIZE,
+ () -> Long.valueOf(this.freePhysicalMemSize.get()).doubleValue()));
+ gaugeList.add(RootMetricContext.get().newContextAwareGauge(ContainerHealthMetrics.PROCESS_HEAP_USED_SIZE,
+ () -> Long.valueOf(this.processHeapUsedSize.get()).doubleValue()));
+ return gaugeList;
+ }
+
+ /**
+ * Returns the {@link Scheduler} object used to configure this service. This method will only be
+ * called once.
+ */
+ @Override
+ protected Scheduler scheduler() {
+ return Scheduler.newFixedRateSchedule(0, this.metricReportingInterval, TimeUnit.SECONDS);
+ }
+}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 15c197f..4b21f4e 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -167,4 +167,6 @@ public class GobblinClusterConfigurationKeys {
public static final String HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "job.stoppingStateTimeoutSeconds";
public static final long DEFAULT_HELIX_JOB_STOPPING_STATE_TIMEOUT_SECONDS = 300;
+ public static final String CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.health.metrics.service.enabled" ;
+ public static final boolean DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED = false;
}
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index c241c9b..2826720 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -191,6 +191,11 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri
this.applicationLauncher.addService(this.jobScheduler);
this.jobConfigurationManager = buildJobConfigurationManager(config);
this.applicationLauncher.addService(this.jobConfigurationManager);
+
+ if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+ GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
+ this.applicationLauncher.addService(new ContainerHealthMetricsService(config));
+ }
}
/**
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index fd17216..aad3908 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -196,6 +196,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
this.services.addAll(suite.getServices());
this.services.addAll(getServices());
+
if (this.services.isEmpty()) {
this.serviceManager = null;
} else {
@@ -335,7 +336,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
* @return a {@link List} of additional {@link Service}s to run.
*/
protected List<Service> getServices() {
- return new ArrayList<>();
+ List<Service> serviceList = new ArrayList<>();
+ if (ConfigUtils.getBoolean(this.config, GobblinClusterConfigurationKeys.CONTAINER_HEALTH_METRICS_SERVICE_ENABLED,
+ GobblinClusterConfigurationKeys.DEFAULT_CONTAINER_HEALTH_METRICS_SERVICE_ENABLED)) {
+ serviceList.add(new ContainerHealthMetricsService(config));
+ }
+ return serviceList;
}
@VisibleForTesting
diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
new file mode 100644
index 0000000..ac67c01
--- /dev/null
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ContainerHealthMetricsServiceTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.cluster;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+
+public class ContainerHealthMetricsServiceTest {
+
+ @Test
+ public void testRunOneIteration() throws Exception {
+ Config config = ConfigFactory.empty();
+ ContainerHealthMetricsService service = new ContainerHealthMetricsService(config);
+ service.runOneIteration();
+ long processCpuTime1 = service.processCpuTime.get();
+ Thread.sleep(10);
+ service.runOneIteration();
+ long processCpuTime2 = service.processCpuTime.get();
+ Assert.assertTrue( processCpuTime1 <= processCpuTime2);
+ }
+}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
index acf31b1..282156d 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinApplicationMaster.java
@@ -47,6 +47,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
@@ -82,7 +83,9 @@ public class GobblinApplicationMaster extends GobblinClusterManager {
public GobblinApplicationMaster(String applicationName, ContainerId containerId, Config config,
YarnConfiguration yarnConfiguration) throws Exception {
super(applicationName, containerId.getApplicationAttemptId().getApplicationId().toString(),
- GobblinClusterUtils.addDynamicConfig(config), Optional.<Path>absent());
+ GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+ ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))),
+ Optional.<Path>absent());
String containerLogDir = config.getString(GobblinYarnConfigurationKeys.LOGS_SINK_ROOT_DIR_KEY);
GobblinYarnLogSource gobblinYarnLogSource = new GobblinYarnLogSource();
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 10ae50c..882b1d5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -105,4 +105,7 @@ public class GobblinYarnConfigurationKeys {
//Constant definitions
public static final String GOBBLIN_YARN_LOG4J_CONFIGURATION_FILE = "log4j-yarn.properties";
public static final String JVM_USER_TIMEZONE_CONFIG = "user.timezone";
+
+ //Configuration properties relating to container mode of execution e.g. Gobblin cluster runs on Yarn
+ public static final String CONTAINER_NUM_KEY = "container.num";
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index c90306c..60e4a76 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -41,6 +41,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.Service;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
import org.apache.gobblin.cluster.GobblinClusterUtils;
@@ -53,18 +54,18 @@ import org.apache.gobblin.yarn.event.DelegationTokenUpdatedEvent;
public class GobblinYarnTaskRunner extends GobblinTaskRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
- private final ContainerId containerId;
public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
- GobblinClusterUtils.addDynamicConfig(config), appWorkDirOptional);
- this.containerId = containerId;
+ GobblinClusterUtils.addDynamicConfig(config.withValue(GobblinYarnConfigurationKeys.CONTAINER_NUM_KEY,
+ ConfigValueFactory.fromAnyRef(YarnHelixUtils.getContainerNum(containerId.toString())))), appWorkDirOptional);
}
@Override
public List<Service> getServices() {
List<Service> services = new ArrayList<>();
+ services.addAll(super.getServices());
if (this.config.hasPath(GobblinYarnConfigurationKeys.KEYTAB_FILE_PATH)) {
LOGGER.info("Adding YarnContainerSecurityManager since login is keytab based");
services.add(new YarnContainerSecurityManager(this.config, this.fs, this.eventBus));
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index c88d7fe..7b7f8f5 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -19,8 +19,6 @@ package org.apache.gobblin.yarn;
import java.io.File;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Map;
@@ -133,4 +131,14 @@ public class YarnHelixUtils {
return environmentVariableMap;
}
+
+ /**
+ * Return the identifier of the containerId. The identifier is the substring in the containerId representing
+ * the sequential number of the container.
+ * @param containerId e.g. "container_e94_1567552810874_2132400_01_000001"
+ * @return sequence number of the containerId e.g. "container-000001"
+ */
+ public static String getContainerNum(String containerId) {
+ return "container-" + containerId.substring(containerId.lastIndexOf("_") + 1);
+ }
}