You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2021/12/23 08:51:40 UTC

[hadoop] branch branch-2.10 updated: YARN-8234. Improve RM system metrics publisher's performance by pushing events to timeline server in batch (#3793)

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 205dddb  YARN-8234. Improve RM system metrics publisher's performance by pushing events to timeline server in batch (#3793)
205dddb is described below

commit 205dddbfc626566d115b6074f225df2f10c41e2f
Author: Ashutosh Gupta <as...@st.niituniversity.in>
AuthorDate: Thu Dec 23 13:44:51 2021 +0530

    YARN-8234. Improve RM system metrics publisher's performance by pushing events to timeline server in batch (#3793)
    
    Signed-off-by: Akira Ajisaka <aa...@apache.org>
    (cherry picked from commit 00e2405fbd598602fdccf31826948c198c053fa5)
    
     Conflicts:
    	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  14 ++
 .../src/main/resources/yarn-default.xml            |  27 +++
 .../metrics/TimelineServiceV1Publisher.java        | 201 +++++++++++++++++++--
 .../metrics/TestSystemMetricsPublisher.java        |  46 +++--
 4 files changed, 261 insertions(+), 27 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 08f89241..a2b4f3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -618,6 +618,20 @@ public class YarnConfiguration extends Configuration {
   public static final int
       DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
 
+  public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
+      RM_PREFIX + "system-metrics-publisher.timeline-server-v1.batch-size";
+  public static final int
+      DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE =
+      1000;
+  public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
+      RM_PREFIX + "system-metrics-publisher.timeline-server-v1.interval-seconds";
+  public static final int DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL =
+      60;
+  public static final String RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
+      RM_PREFIX + "system-metrics-publisher.timeline-server-v1.enable-batch";
+  public static final boolean DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED =
+      false;
+
   //RM delegation token related keys
   public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
     RM_PREFIX + "delegation.key.update-interval";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7d4f04a..6b964fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -862,6 +862,33 @@
   </property>
 
   <property>
+    <description>
+      This setting enables/disables timeline server v1 publisher to publish timeline events in batch.
+    </description>
+    <name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.enable-batch</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>
+      The size of timeline server v1 publisher sending events in one request.
+    </description>
+    <name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.batch-size</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>
+      When enable batch publishing in timeline server v1, we must avoid that the
+      publisher waits for a batch to be filled up and hold events in buffer for long
+      time. So we add another thread which send event's in the buffer periodically.
+      This config sets the interval of the cyclical sending thread.
+    </description>
+    <name>yarn.resourcemanager.system-metrics-publisher.timeline-server-v1.interval-seconds</name>
+    <value>60</value>
+  </property>
+
+  <property>
     <description>Number of diagnostics/failure messages can be saved in RM for
     log aggregation. It also defines the number of diagnostics/failure
     messages can be shown in log aggregation web ui.</description>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
index 1e12287..10c8e7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TimelineServiceV1Publisher.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
@@ -44,6 +48,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is responsible for posting application, appattempt &amp; Container
@@ -51,17 +57,63 @@ import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
  */
 public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
 
-  private static final Log LOG =
-      LogFactory.getLog(TimelineServiceV1Publisher.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TimelineServiceV1Publisher.class);
 
   public TimelineServiceV1Publisher() {
     super("TimelineserviceV1Publisher");
   }
 
   private TimelineClient client;
+  private LinkedBlockingQueue<TimelineEntity> entityQueue;
+  private ExecutorService sendEventThreadPool;
+  private int dispatcherPoolSize;
+  private int dispatcherBatchSize;
+  private int putEventInterval;
+  private boolean isTimeLineServerBatchEnabled;
+  private volatile boolean stopped = false;
+  private PutEventThread putEventThread;
+  private Object sendEntityLock;
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
+    isTimeLineServerBatchEnabled =
+        conf.getBoolean(
+            YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
+            YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED);
+    if (isTimeLineServerBatchEnabled) {
+      putEventInterval =
+          conf.getInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+              YarnConfiguration.DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL)
+              * 1000;
+      if (putEventInterval <= 0) {
+        throw new IllegalArgumentException(
+            "RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL should be greater than 0");
+      }
+      dispatcherPoolSize = conf.getInt(
+          YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
+          YarnConfiguration.
+              DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE);
+      if (dispatcherPoolSize <= 0) {
+        throw new IllegalArgumentException(
+            "RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE should be greater than 0");
+      }
+      dispatcherBatchSize = conf.getInt(
+          YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE,
+          YarnConfiguration.
+              DEFAULT_RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE);
+      if (dispatcherBatchSize <= 1) {
+        throw new IllegalArgumentException(
+            "RM_TIMELINE_SERVER_V1_PUBLISHER_DISPATCHER_BATCH_SIZE should be greater than 1");
+      }
+      putEventThread = new PutEventThread();
+      sendEventThreadPool = Executors.newFixedThreadPool(dispatcherPoolSize);
+      entityQueue = new LinkedBlockingQueue<>(dispatcherBatchSize + 1);
+      sendEntityLock = new Object();
+      LOG.info("Timeline service v1 batch publishing enabled");
+    } else {
+      LOG.info("Timeline service v1 batch publishing disabled");
+    }
     client = TimelineClient.createTimelineClient();
     addIfService(client);
     super.serviceInit(conf);
@@ -69,6 +121,36 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
         new TimelineV1EventHandler());
   }
 
+  protected void serviceStart() throws Exception {
+    if (isTimeLineServerBatchEnabled) {
+      stopped = false;
+      putEventThread.start();
+    }
+    super.serviceStart();
+  }
+
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    if (isTimeLineServerBatchEnabled) {
+      stopped = true;
+      putEventThread.interrupt();
+      try {
+        putEventThread.join();
+        SendEntity task = new SendEntity();
+        if (!task.buffer.isEmpty()) {
+          LOG.info("Initiating final putEntities, remaining entities left in entityQueue: {}",
+              task.buffer.size());
+          sendEventThreadPool.submit(task);
+        }
+      } finally {
+        sendEventThreadPool.shutdown();
+        if (!sendEventThreadPool.awaitTermination(3, TimeUnit.SECONDS)) {
+          sendEventThreadPool.shutdownNow();
+        }
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void appCreated(RMApp app, long createdTime) {
@@ -244,7 +326,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
   @SuppressWarnings("unchecked")
   @Override
   public void appAttemptFinished(RMAppAttempt appAttempt,
-      RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
+      RMAppAttemptState appAttemptState, RMApp app, long finishedTime) {
     TimelineEntity entity =
         createAppAttemptEntity(appAttempt.getAppAttemptId());
 
@@ -261,7 +343,7 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
     eventInfo.put(AppAttemptMetricsConstants.FINAL_STATUS_INFO,
         app.getFinalApplicationStatus().toString());
     eventInfo.put(AppAttemptMetricsConstants.STATE_INFO, RMServerUtils
-        .createApplicationAttemptState(appAttemtpState).toString());
+        .createApplicationAttemptState(appAttemptState).toString());
     if (appAttempt.getMasterContainer() != null) {
       eventInfo.put(AppAttemptMetricsConstants.MASTER_CONTAINER_INFO,
           appAttempt.getMasterContainer().getId().toString());
@@ -361,23 +443,68 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
   }
 
   private void putEntity(TimelineEntity entity) {
-    try {
+    if (isTimeLineServerBatchEnabled) {
+      try {
+        entityQueue.put(entity);
+        if (entityQueue.size() > dispatcherBatchSize) {
+          SendEntity task = null;
+          synchronized (sendEntityLock) {
+            if (entityQueue.size() > dispatcherBatchSize) {
+              task = new SendEntity();
+            }
+          }
+          if (task != null) {
+            sendEventThreadPool.submit(task);
+          }
+        }
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity batch  [ " + entity.getEntityType() + ","
+            + entity.getEntityId() + " ] ", e);
+      }
+    } else {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Publishing the entity " + entity.getEntityId()
+              + ", JSON-style content: "
+              + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+        }
+        client.putEntities(entity);
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity [ " + entity.getEntityType() + ","
+            + entity.getEntityId() + " ] ", e);
+      }
+    }
+  }
+
+  private class SendEntity implements Runnable {
+
+    private ArrayList<TimelineEntity> buffer;
+
+    SendEntity() {
+      buffer = new ArrayList();
+      entityQueue.drainTo(buffer);
+    }
+
+    @Override
+    public void run() {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Publishing the entity " + entity.getEntityId()
-            + ", JSON-style content: "
-            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+        LOG.debug("Number of timeline entities being sent in batch: {}", buffer.size());
+      }
+      if (buffer.isEmpty()) {
+        return;
+      }
+      try {
+        client.putEntities(buffer.toArray(new TimelineEntity[0]));
+      } catch (Exception e) {
+        LOG.error("Error when publishing entity: ", e);
       }
-      client.putEntities(entity);
-    } catch (Exception e) {
-      LOG.error("Error when publishing entity [" + entity.getEntityType() + ","
-          + entity.getEntityId() + "]", e);
     }
   }
 
   private class TimelineV1PublishEvent extends TimelinePublishEvent {
     private TimelineEntity entity;
 
-    public TimelineV1PublishEvent(SystemMetricsEventType type,
+    TimelineV1PublishEvent(SystemMetricsEventType type,
         TimelineEntity entity, ApplicationId appId) {
       super(type, appId);
       this.entity = entity;
@@ -395,4 +522,46 @@ public class TimelineServiceV1Publisher extends AbstractSystemMetricsPublisher {
       putEntity(event.getEntity());
     }
   }
-}
+
+  private class PutEventThread extends Thread {
+    PutEventThread() {
+      super("PutEventThread");
+    }
+
+    @Override
+    public void run() {
+      LOG.info("System metrics publisher will put events every " +
+          String.valueOf(putEventInterval) + " milliseconds");
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        if (System.currentTimeMillis() % putEventInterval >= 1000) {
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            LOG.warn(SystemMetricsPublisher.class.getName()
+                + " is interrupted. Exiting.");
+            break;
+          }
+          continue;
+        }
+        SendEntity task = null;
+        synchronized (sendEntityLock) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating SendEntity task in PutEventThread");
+          }
+          task = new SendEntity();
+        }
+        if (task != null) {
+          sendEventThreadPool.submit(task);
+        }
+        try {
+          // sleep added to avoid multiple SendEntity task within a single interval.
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          LOG.warn(SystemMetricsPublisher.class.getName()
+              + " is interrupted. Exiting.");
+          break;
+        }
+      }
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
index 7201d2a..30cfe6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/TestSystemMetricsPublisher.java
@@ -18,9 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -29,6 +27,14 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -64,19 +70,33 @@ import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(Parameterized.class)
 public class TestSystemMetricsPublisher {
 
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {{false, 0}, {true, 1}});
+  }
+
   private static ApplicationHistoryServer timelineServer;
   private static TimelineServiceV1Publisher metricsPublisher;
   private static TimelineStore store;
 
-  @BeforeClass
-  public static void setup() throws Exception {
+  private boolean rmTimelineServerV1PublisherBatchEnabled;
+  private int rmTimelineServerV1PublisherInterval;
+
+  public TestSystemMetricsPublisher(boolean rmTimelineServerV1PublisherBatchEnabled,
+      int rmTimelineServerV1PublisherInterval) {
+    this.rmTimelineServerV1PublisherBatchEnabled = rmTimelineServerV1PublisherBatchEnabled;
+    this.rmTimelineServerV1PublisherInterval = rmTimelineServerV1PublisherInterval;
+  }
+
+  @Before
+  public void setup() throws Exception {
     YarnConfiguration conf = new YarnConfiguration();
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
@@ -87,6 +107,10 @@ public class TestSystemMetricsPublisher {
     conf.setInt(
         YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
         2);
+    conf.setBoolean(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_BATCH_ENABLED,
+        rmTimelineServerV1PublisherBatchEnabled);
+    conf.setInt(YarnConfiguration.RM_TIMELINE_SERVER_V1_PUBLISHER_INTERVAL,
+        rmTimelineServerV1PublisherInterval);
 
     timelineServer = new ApplicationHistoryServer();
     timelineServer.init(conf);
@@ -98,8 +122,8 @@ public class TestSystemMetricsPublisher {
     metricsPublisher.start();
   }
 
-  @AfterClass
-  public static void tearDown() throws Exception {
+  @After
+  public void tearDown() throws Exception {
     if (metricsPublisher != null) {
       metricsPublisher.stop();
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org