You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/03/30 20:00:35 UTC

ambari git commit: AMBARI-15621 : Cluster Second aggregator taking more than 2 mins to execute on large clusters, thereby causing lag (avijayan)

Repository: ambari
Updated Branches:
  refs/heads/branch-2.2 77e1988bb -> d0b96cd95


AMBARI-15621 : Cluster Second aggregator taking more than 2 mins to execute on large clusters, thereby causing lag (avijayan)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/d0b96cd9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/d0b96cd9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/d0b96cd9

Branch: refs/heads/branch-2.2
Commit: d0b96cd95f5a5c9e3d672e820b2551a85c8ca41e
Parents: 77e1988
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Wed Mar 30 10:53:44 2016 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Wed Mar 30 10:53:44 2016 -0700

----------------------------------------------------------------------
 .../timeline/HBaseTimelineMetricStore.java      | 19 +++--
 .../aggregators/AbstractTimelineAggregator.java | 82 +++++++++-----------
 .../AbstractTimelineAggregatorTest.java         | 48 ++++++------
 .../stacks/HDP/2.0.6/services/stack_advisor.py  |  2 +
 4 files changed, 71 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 2f080e3..a32e206 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -95,37 +95,37 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
       // Start the cluster aggregator second
       TimelineMetricAggregator secondClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
-      scheduleAggregatorThread(secondClusterAggregator, metricsConf);
+      scheduleAggregatorThread(secondClusterAggregator);
 
       // Start the minute cluster aggregator
       TimelineMetricAggregator minuteClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(minuteClusterAggregator, metricsConf);
+      scheduleAggregatorThread(minuteClusterAggregator);
 
       // Start the hourly cluster aggregator
       TimelineMetricAggregator hourlyClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(hourlyClusterAggregator, metricsConf);
+      scheduleAggregatorThread(hourlyClusterAggregator);
 
       // Start the daily cluster aggregator
       TimelineMetricAggregator dailyClusterAggregator =
         TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(dailyClusterAggregator, metricsConf);
+      scheduleAggregatorThread(dailyClusterAggregator);
 
       // Start the minute host aggregator
       TimelineMetricAggregator minuteHostAggregator =
         TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(minuteHostAggregator, metricsConf);
+      scheduleAggregatorThread(minuteHostAggregator);
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
         TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(hourlyHostAggregator, metricsConf);
+      scheduleAggregatorThread(hourlyHostAggregator);
 
       // Start the daily host aggregator
       TimelineMetricAggregator dailyHostAggregator =
         TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
-      scheduleAggregatorThread(dailyHostAggregator, metricsConf);
+      scheduleAggregatorThread(dailyHostAggregator);
 
       if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
         int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
@@ -333,12 +333,11 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
     return metricMetadataManager.getHostedAppsCache();
   }
 
-  private void scheduleAggregatorThread(TimelineMetricAggregator aggregator,
-                                        Configuration metricsConf) {
+  private void scheduleAggregatorThread(TimelineMetricAggregator aggregator) {
     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
     if (!aggregator.isDisabled()) {
       executorService.scheduleAtFixedRate(aggregator,
-        SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)),
+        0l,
         aggregator.getSleepIntervalMillis(),
         TimeUnit.MILLISECONDS);
     }

http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index f8ec516..c576a40 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -52,7 +52,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
   protected String tableName;
   protected String outputTableName;
   protected Long nativeTimeRangeDelay;
-  protected Long lastAggregatedEndTime = -1l;
 
   // Explicitly name aggregators for logging needs
   private final String aggregatorName;
@@ -93,18 +92,19 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     LOG.info("Started Timeline aggregator thread @ " + new Date());
     Long SLEEP_INTERVAL = getSleepIntervalMillis();
     runOnce(SLEEP_INTERVAL);
-    this.lastAggregatedEndTime = this.lastAggregatedEndTime + SLEEP_INTERVAL;
   }
 
   /**
    * Access relaxed for tests
    */
   public void runOnce(Long SLEEP_INTERVAL) {
-    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun();
+
+    long currentTime = System.currentTimeMillis();
+    long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
 
     if (lastCheckPointTime != -1) {
       LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
-        + ((lastAggregatedEndTime - lastCheckPointTime) / 1000)
+        + ((currentTime - lastCheckPointTime) / 1000)
         + " seconds.");
 
       boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
@@ -120,40 +120,40 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     }
   }
 
-  private long readLastCheckpointSavingOnFirstRun() {
+  private long readLastCheckpointSavingOnFirstRun(long currentTime) {
     long lastCheckPointTime = -1;
 
     try {
       lastCheckPointTime = readCheckPoint();
-      LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
-
-      if (lastAggregatedEndTime == -1l) {
-        lastAggregatedEndTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis());
-      }
-
-      if (isLastCheckPointTooOld(lastCheckPointTime)) {
-        LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
-          "lastCheckPointTime = " + new Date(lastCheckPointTime));
-        lastCheckPointTime = -1;
-      }
-
-      if (lastCheckPointTime > 0) {
-        lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
-        LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
-      }
-
-      if (isLastCheckPointTooYoung(lastCheckPointTime)) {
-        LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
-        lastCheckPointTime = -1;
-      }
-
-      if (lastCheckPointTime == -1) {
-        // Assuming first run, save checkpoint and sleep.
-        // Set checkpoint to rounded time in the past to allow the
-        // agents/collectors to catch up
-        LOG.info("Saving checkpoint time on first run. " +
-          new Date((lastAggregatedEndTime)));
-        saveCheckPoint(lastAggregatedEndTime);
+      if (lastCheckPointTime != -1) {
+        LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
+        if (isLastCheckPointTooOld(currentTime, lastCheckPointTime)) {
+          LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
+            "lastCheckPointTime = " + new Date(lastCheckPointTime));
+          lastCheckPointTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis()) - getSleepIntervalMillis();
+          LOG.info("Saving checkpoint time. " + new Date((lastCheckPointTime)));
+          saveCheckPoint(lastCheckPointTime);
+
+        } else {
+
+          if (lastCheckPointTime > 0) {
+            lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
+            LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
+          }
+
+          if (isLastCheckPointTooYoung(lastCheckPointTime)) {
+            LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
+            return -1; //Skip Aggregation this time around
+          }
+        }
+      } else {
+        /*
+          No checkpoint. Save current rounded checkpoint and sleep for 1 cycle.
+         */
+        LOG.info("No checkpoint found");
+        long firstCheckPoint = getRoundedAggregateTimeMillis(getSleepIntervalMillis());
+        LOG.info("Saving checkpoint time. " + new Date((firstCheckPoint)));
+        saveCheckPoint(firstCheckPoint);
       }
     } catch (IOException io) {
       LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
@@ -161,16 +161,16 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     return lastCheckPointTime;
   }
 
-  private boolean isLastCheckPointTooOld(long checkpoint) {
+  private boolean isLastCheckPointTooOld(long currentTime, long checkpoint) {
     // first checkpoint is saved checkpointDelayMillis in the past,
     // so here we also need to take it into account
     return checkpoint != -1 &&
-      ((lastAggregatedEndTime - checkpoint) > getCheckpointCutOffIntervalMillis());
+      ((currentTime - checkpoint) > getCheckpointCutOffIntervalMillis());
   }
 
   private boolean isLastCheckPointTooYoung(long checkpoint) {
     return checkpoint != -1 &&
-      ((lastAggregatedEndTime <= checkpoint));
+      ((getRoundedAggregateTimeMillis(getSleepIntervalMillis()) <= checkpoint));
   }
 
   protected long readCheckPoint() {
@@ -295,14 +295,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
     return checkpointLocation;
   }
 
-  protected void setLastAggregatedEndTime(long lastAggregatedEndTime) {
-    this.lastAggregatedEndTime = lastAggregatedEndTime;
-  }
-
-  protected long getLastAggregatedEndTime() {
-    return lastAggregatedEndTime;
-  }
-
   public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
     return referenceTime - (referenceTime % aggregatorPeriod);
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index 21b9839..827f399 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.HBaseTimelineMetricStore;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.junit.Before;
 import org.junit.Test;
@@ -114,45 +115,42 @@ public class AbstractTimelineAggregatorTest {
     long currentTime = System.currentTimeMillis();
     long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
       sleepIntervalMillis);
-    
+
     //Test first run of aggregator with no checkpoint
     checkPoint.set(-1);
-    agg.setLastAggregatedEndTime(-1l);
     agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
     assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
     assertEquals(roundedOffAggregatorTime, checkPoint.get());
     assertEquals("Do not aggregate on first run", 0, actualRuns);
 
-    //Test first run with Too Old checkpoint
+//    //Test first run with too "recent" checkpoint
     currentTime = System.currentTimeMillis();
-    checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
-    agg.setLastAggregatedEndTime(-1l);
+    checkPoint.set(currentTime);
+    agg.setSleepIntervalMillis(sleepIntervalMillis);
     agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
     assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    assertEquals(roundedOffAggregatorTime, checkPoint.get());
     assertEquals("Do not aggregate on first run", 0, actualRuns);
 
-    //Test first run with too "recent" checkpoint
+    //Test first run with Too Old checkpoint
     currentTime = System.currentTimeMillis();
-    checkPoint.set(currentTime);
-    agg.setLastAggregatedEndTime(-1l);
-    agg.setSleepIntervalMillis(sleepIntervalMillis);
+    checkPoint.set(currentTime - 16*60*1000); //Old checkpoint
     agg.runOnce(sleepIntervalMillis);
-    assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
-    assertEquals("endTime  should be zero", 0, endTimeInDoWork.get());
-    assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get());
-    assertEquals("Do not aggregate on first run", 0, actualRuns);
+    long checkPointTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(sleepIntervalMillis);
+    assertEquals("startTime should be zero", checkPointTime - sleepIntervalMillis, startTimeInDoWork.get());
+    assertEquals("endTime  should be zero", checkPointTime, endTimeInDoWork.get());
+    assertEquals(roundedOffAggregatorTime, checkPoint.get());
+    assertEquals("Do not aggregate on first run", 1, actualRuns);
+
 
-    //Test first run with perfect checkpoint (sleepIntervalMillis back)
+//    //Test first run with perfect checkpoint (sleepIntervalMillis back)
     currentTime = System.currentTimeMillis();
     roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime,
       sleepIntervalMillis);
-    long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
+    checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
     long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
     checkPoint.set(checkPointTime);
-    agg.setLastAggregatedEndTime(-1l);
     agg.runOnce(sleepIntervalMillis);
     assertEquals("startTime should the lower rounded time of the checkpoint time",
       expectedCheckPoint, startTimeInDoWork.get());
@@ -160,20 +158,20 @@ public class AbstractTimelineAggregatorTest {
       expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
     assertEquals(expectedCheckPoint + sleepIntervalMillis,
       checkPoint.get());
-    assertEquals("Aggregate on first run", 1, actualRuns);
+    assertEquals("Aggregate on first run", 2, actualRuns);
 
     //Test edge case for checkpoint (2 x sleepIntervalMillis)
-    checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis;
-    expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
-    checkPoint.set(checkPointTime);
+    currentTime = System.currentTimeMillis();
+    checkPoint.set(currentTime - 2*sleepIntervalMillis + 5000);
     agg.runOnce(sleepIntervalMillis);
+    long expectedStartTime = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(currentTime - 2*sleepIntervalMillis + 5000, sleepIntervalMillis);
     assertEquals("startTime should the lower rounded time of the checkpoint time",
-      expectedCheckPoint, startTimeInDoWork.get());
+      expectedStartTime, startTimeInDoWork.get());
     assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
-      expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
-    assertEquals(expectedCheckPoint + sleepIntervalMillis,
+      expectedStartTime + sleepIntervalMillis, endTimeInDoWork.get());
+    assertEquals(expectedStartTime + sleepIntervalMillis,
       checkPoint.get());
-    assertEquals("Aggregate on second run", 2, actualRuns);
+    assertEquals("Aggregate on second run", 3, actualRuns);
 
 
  }

http://git-wip-us.apache.org/repos/asf/ambari/blob/d0b96cd9/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index b8e2c45..a007817 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -592,12 +592,14 @@ class HDP206StackAdvisor(DefaultStackAdvisor):
         putAmsHbaseSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 20)
         putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 81920000)
         putAmsSiteProperty("phoenix.query.maxGlobalMemoryPercentage", 30)
+        putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 10000)
       elif total_sinks_count >= 500:
         putAmsHbaseSiteProperty("hbase.regionserver.handler.count", 60)
         putAmsHbaseSiteProperty("hbase.regionserver.hlog.blocksize", 134217728)
         putAmsHbaseSiteProperty("hbase.regionserver.maxlogs", 64)
         putAmsHbaseSiteProperty("hbase.hregion.memstore.flush.size", 268435456)
         putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 40960000)
+        putAmsSiteProperty("timeline.metrics.service.resultset.fetchSize", 5000)
       else:
         putAmsHbaseSiteProperty("phoenix.coprocessor.maxMetaDataCacheSize", 20480000)
       pass