You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/04/05 19:07:03 UTC

[hadoop] branch trunk updated: YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.

This is an automated email from the ASF dual-hosted git repository.

vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22362c8  YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.
22362c8 is described below

commit 22362c876d28c081c37dd74f6f1ae8139695e254
Author: Vrushali C <vr...@apache.org>
AuthorDate: Fri Apr 5 12:06:51 2019 -0700

    YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  9 +++++
 .../src/main/resources/yarn-default.xml            |  7 ++++
 .../collector/TimelineCollector.java               | 24 +++++++++++-
 .../collector/TestTimelineCollector.java           | 43 ++++++++++++++++++++--
 4 files changed, 79 insertions(+), 4 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fa75eb4..34f1e93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2767,6 +2767,15 @@ public class YarnConfiguration extends Configuration {
   public static final int
       DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
 
+  /** The setting that controls the capacity of the queue for async writes
+   * to timeline collector.
+   */
+  public static final String TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY =
+      TIMELINE_SERVICE_PREFIX + "writer.async.queue.capacity";
+
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY = 100;
+
   /**
    * The name for setting that controls how long the final value of
    * a metric of a completed app is retained before merging
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dfbffd4..004af7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2591,6 +2591,13 @@
   </property>
 
   <property>
+    <description>The setting that decides the capacity of the queue to hold
+    asynchronous timeline entities.</description>
+    <name>yarn.timeline-service.writer.async.queue.capacity</name>
+    <value>100</value>
+  </property>
+
+  <property>
     <description>Time period till which the application collector will be alive
      in NM, after the  application master container finishes.</description>
     <name>yarn.timeline-service.app-collector.linger-period.ms</name>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 6c83665..0c54ed0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -23,8 +23,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +65,7 @@ public abstract class TimelineCollector extends CompositeService {
       = new ConcurrentHashMap<>();
   private static Set<String> entityTypesSkipAggregation
       = new HashSet<>();
+  private ThreadPoolExecutor pool;
 
   private volatile boolean readyToAggregate = false;
 
@@ -73,6 +78,14 @@ public abstract class TimelineCollector extends CompositeService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
+    int capacity = conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY
+    );
+    pool = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS,
+        new ArrayBlockingQueue<>(capacity));
+    pool.setRejectedExecutionHandler(
+        new ThreadPoolExecutor.DiscardOldestPolicy());
   }
 
   @Override
@@ -83,6 +96,7 @@ public abstract class TimelineCollector extends CompositeService {
   @Override
   protected void serviceStop() throws Exception {
     isStopped = true;
+    pool.shutdownNow();
     super.serviceStop();
   }
 
@@ -213,7 +227,15 @@ public abstract class TimelineCollector extends CompositeService {
     LOG.debug("putEntitiesAsync(entities={}, callerUgi={})", entities,
         callerUgi);
 
-    writeTimelineEntities(entities, callerUgi);
+    pool.execute(new Runnable() {
+      @Override public void run() {
+        try {
+          writeTimelineEntities(entities, callerUgi);
+        } catch (IOException ie) {
+          LOG.error("Got an exception while writing entity", ie);
+        }
+      }
+    });
   }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
index f951540..8051792 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -27,11 +27,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 import org.junit.Test;
 
 import com.google.common.collect.Sets;
+import org.mockito.internal.stubbing.answers.AnswersWithDelay;
+import org.mockito.internal.stubbing.answers.Returns;
 
 import java.io.IOException;
 import java.util.HashSet;
@@ -46,6 +50,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestTimelineCollector {
 
@@ -165,17 +170,49 @@ public class TestTimelineCollector {
    * putEntityAsync() calls.
    */
   @Test
-  public void testPutEntityAsync() throws IOException {
+  public void testPutEntityAsync() throws Exception {
     TimelineWriter writer = mock(TimelineWriter.class);
     TimelineCollector collector = new TimelineCollectorForTest(writer);
-
+    collector.init(new Configuration());
+    collector.start();
     TimelineEntities entities = generateTestEntities(1, 1);
     collector.putEntitiesAsync(
         entities, UserGroupInformation.createRemoteUser("test-user"));
-
+    Thread.sleep(1000);
     verify(writer, times(1)).write(any(TimelineCollectorContext.class),
         any(TimelineEntities.class), any(UserGroupInformation.class));
     verify(writer, never()).flush();
+    collector.stop();
+  }
+
+  /**
+   * Test TimelineCollector's discarding entities in case of async writes if
+   * write is taking too much time.
+   */
+  @Test
+  public void testAsyncEntityDiscard() throws Exception {
+    TimelineWriter writer = mock(TimelineWriter.class);
+
+    when(writer.write(any(), any(), any())).thenAnswer(
+        new AnswersWithDelay(500, new Returns(new TimelineWriteResponse())));
+    TimelineCollector collector = new TimelineCollectorForTest(writer);
+    Configuration config = new Configuration();
+    config
+        .setInt(YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+            3);
+    collector.init(config);
+    collector.start();
+    for (int i = 0; i < 10; ++i) {
+      TimelineEntities entities = generateTestEntities(i + 1, 1);
+      collector.putEntitiesAsync(entities,
+          UserGroupInformation.createRemoteUser("test-user"));
+    }
+    Thread.sleep(3000);
+    verify(writer, times(4))
+        .write(any(TimelineCollectorContext.class), any(TimelineEntities.class),
+            any(UserGroupInformation.class));
+    verify(writer, never()).flush();
+    collector.stop();
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org