You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vr...@apache.org on 2019/04/05 19:07:03 UTC
[hadoop] branch trunk updated: YARN-9335 [atsv2] Restrict the
number of elements held in timeline collector when backend is unreachable
for async calls. Contributed by Abhishesk Modi.
This is an automated email from the ASF dual-hosted git repository.
vrushali pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22362c8 YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.
22362c8 is described below
commit 22362c876d28c081c37dd74f6f1ae8139695e254
Author: Vrushali C <vr...@apache.org>
AuthorDate: Fri Apr 5 12:06:51 2019 -0700
YARN-9335 [atsv2] Restrict the number of elements held in timeline collector when backend is unreachable for async calls. Contributed by Abhishesk Modi.
---
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 9 +++++
.../src/main/resources/yarn-default.xml | 7 ++++
.../collector/TimelineCollector.java | 24 +++++++++++-
.../collector/TestTimelineCollector.java | 43 ++++++++++++++++++++--
4 files changed, 79 insertions(+), 4 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index fa75eb4..34f1e93 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2767,6 +2767,15 @@ public class YarnConfiguration extends Configuration {
public static final int
DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+ /** The setting that controls the capacity of the queue for async writes
+ * to timeline collector.
+ */
+ public static final String TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY =
+ TIMELINE_SERVICE_PREFIX + "writer.async.queue.capacity";
+
+ public static final int
+ DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY = 100;
+
/**
* The name for setting that controls how long the final value of
* a metric of a completed app is retained before merging
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index dfbffd4..004af7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2591,6 +2591,13 @@
</property>
<property>
+ <description>The setting that decides the capacity of the queue to hold
+ asynchronous timeline entities.</description>
+ <name>yarn.timeline-service.writer.async.queue.capacity</name>
+ <value>100</value>
+ </property>
+
+ <property>
<description>Time period till which the application collector will be alive
in NM, after the application master container finishes.</description>
<name>yarn.timeline-service.app-collector.linger-period.ms</name>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 6c83665..0c54ed0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -23,8 +23,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -37,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +65,7 @@ public abstract class TimelineCollector extends CompositeService {
= new ConcurrentHashMap<>();
private static Set<String> entityTypesSkipAggregation
= new HashSet<>();
+ private ThreadPoolExecutor pool;
private volatile boolean readyToAggregate = false;
@@ -73,6 +78,14 @@ public abstract class TimelineCollector extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
+ int capacity = conf.getInt(
+ YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+ YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY
+ );
+ pool = new ThreadPoolExecutor(1, 1, 3, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(capacity));
+ pool.setRejectedExecutionHandler(
+ new ThreadPoolExecutor.DiscardOldestPolicy());
}
@Override
@@ -83,6 +96,7 @@ public abstract class TimelineCollector extends CompositeService {
@Override
protected void serviceStop() throws Exception {
isStopped = true;
+ pool.shutdownNow();
super.serviceStop();
}
@@ -213,7 +227,15 @@ public abstract class TimelineCollector extends CompositeService {
LOG.debug("putEntitiesAsync(entities={}, callerUgi={})", entities,
callerUgi);
- writeTimelineEntities(entities, callerUgi);
+ pool.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ writeTimelineEntities(entities, callerUgi);
+ } catch (IOException ie) {
+ LOG.error("Got an exception while writing entity", ie);
+ }
+ }
+ });
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
index f951540..8051792 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollector.java
@@ -27,11 +27,15 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector.AggregationStatusTable;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.junit.Test;
import com.google.common.collect.Sets;
+import org.mockito.internal.stubbing.answers.AnswersWithDelay;
+import org.mockito.internal.stubbing.answers.Returns;
import java.io.IOException;
import java.util.HashSet;
@@ -46,6 +50,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestTimelineCollector {
@@ -165,17 +170,49 @@ public class TestTimelineCollector {
* putEntityAsync() calls.
*/
@Test
- public void testPutEntityAsync() throws IOException {
+ public void testPutEntityAsync() throws Exception {
TimelineWriter writer = mock(TimelineWriter.class);
TimelineCollector collector = new TimelineCollectorForTest(writer);
-
+ collector.init(new Configuration());
+ collector.start();
TimelineEntities entities = generateTestEntities(1, 1);
collector.putEntitiesAsync(
entities, UserGroupInformation.createRemoteUser("test-user"));
-
+ Thread.sleep(1000);
verify(writer, times(1)).write(any(TimelineCollectorContext.class),
any(TimelineEntities.class), any(UserGroupInformation.class));
verify(writer, never()).flush();
+ collector.stop();
+ }
+
+ /**
+ * Test TimelineCollector's discarding entities in case of async writes if
+ * write is taking too much time.
+ */
+ @Test
+ public void testAsyncEntityDiscard() throws Exception {
+ TimelineWriter writer = mock(TimelineWriter.class);
+
+ when(writer.write(any(), any(), any())).thenAnswer(
+ new AnswersWithDelay(500, new Returns(new TimelineWriteResponse())));
+ TimelineCollector collector = new TimelineCollectorForTest(writer);
+ Configuration config = new Configuration();
+ config
+ .setInt(YarnConfiguration.TIMELINE_SERVICE_WRITER_ASYNC_QUEUE_CAPACITY,
+ 3);
+ collector.init(config);
+ collector.start();
+ for (int i = 0; i < 10; ++i) {
+ TimelineEntities entities = generateTestEntities(i + 1, 1);
+ collector.putEntitiesAsync(entities,
+ UserGroupInformation.createRemoteUser("test-user"));
+ }
+ Thread.sleep(3000);
+ verify(writer, times(4))
+ .write(any(TimelineCollectorContext.class), any(TimelineEntities.class),
+ any(UserGroupInformation.class));
+ verify(writer, never()).flush();
+ collector.stop();
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org