You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2016/03/04 19:58:10 UTC
ambari git commit: AMBARI-15267 : Metrics aggregate times should be
tied to aggregation period instead of AMS start time (avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-2.2 07da1862a -> bda06cfbf
AMBARI-15267 : Metrics aggregate times should be tied to aggregation period instead of AMS start time (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/bda06cfb
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/bda06cfb
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/bda06cfb
Branch: refs/heads/branch-2.2
Commit: bda06cfbf8a742b9dbe9a0b1fc0c78dd7f2e4e63
Parents: 07da186
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Fri Mar 4 10:57:43 2016 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Fri Mar 4 10:57:54 2016 -0800
----------------------------------------------------------------------
.../timeline/HBaseTimelineMetricStore.java | 50 ++---
.../aggregators/AbstractTimelineAggregator.java | 117 +++++------
.../aggregators/TimelineClusterMetric.java | 4 +
.../aggregators/TimelineMetricAggregator.java | 9 +-
.../TimelineMetricClusterAggregator.java | 8 +-
.../TimelineMetricClusterAggregatorSecond.java | 6 +-
.../TimelineMetricHostAggregator.java | 6 +-
.../v2/TimelineMetricClusterAggregator.java | 2 +-
.../v2/TimelineMetricHostAggregator.java | 2 +-
.../timeline/query/PhoenixTransactSQL.java | 4 +-
.../AbstractTimelineAggregatorTest.java | 204 +++++--------------
11 files changed, 157 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index 37e4796..f460292 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -48,6 +48,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.AGGREGATOR_CHECKPOINT_DELAY;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
public class HBaseTimelineMetricStore extends AbstractService implements TimelineMetricStore {
@@ -92,58 +94,37 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
// Start the cluster aggregator second
TimelineMetricAggregator secondClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(hBaseAccessor, metricsConf, metricMetadataManager);
- if (!secondClusterAggregator.isDisabled()) {
- Thread aggregatorThread = new Thread(secondClusterAggregator);
- aggregatorThread.start();
- }
+ scheduleAggregatorThread(secondClusterAggregator, metricsConf);
- // Start the minute cluster aggregator
+// // Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(hBaseAccessor, metricsConf);
- if (!minuteClusterAggregator.isDisabled()) {
- Thread aggregatorThread = new Thread(minuteClusterAggregator);
- aggregatorThread.start();
- }
+ scheduleAggregatorThread(minuteClusterAggregator, metricsConf);
// Start the hourly cluster aggregator
TimelineMetricAggregator hourlyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(hBaseAccessor, metricsConf);
- if (!hourlyClusterAggregator.isDisabled()) {
- Thread aggregatorThread = new Thread(hourlyClusterAggregator);
- aggregatorThread.start();
- }
+ scheduleAggregatorThread(hourlyClusterAggregator, metricsConf);
// Start the daily cluster aggregator
TimelineMetricAggregator dailyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(hBaseAccessor, metricsConf);
- if (!dailyClusterAggregator.isDisabled()) {
- Thread aggregatorThread = new Thread(dailyClusterAggregator);
- aggregatorThread.start();
- }
+ scheduleAggregatorThread(dailyClusterAggregator, metricsConf);
// Start the minute host aggregator
TimelineMetricAggregator minuteHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
- if (!minuteHostAggregator.isDisabled()) {
- Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
- minuteAggregatorThread.start();
- }
+ scheduleAggregatorThread(minuteHostAggregator, metricsConf);
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
- if (!hourlyHostAggregator.isDisabled()) {
- Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
- aggregatorHourlyThread.start();
- }
+ scheduleAggregatorThread(hourlyHostAggregator, metricsConf);
// Start the daily host aggregator
TimelineMetricAggregator dailyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(hBaseAccessor, metricsConf);
- if (!dailyHostAggregator.isDisabled()) {
- Thread aggregatorDailyThread = new Thread(dailyHostAggregator);
- aggregatorDailyThread.start();
- }
+ scheduleAggregatorThread(dailyHostAggregator, metricsConf);
if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
@@ -350,4 +331,15 @@ public class HBaseTimelineMetricStore extends AbstractService implements Timelin
public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
return metricMetadataManager.getHostedAppsCache();
}
+
+ private void scheduleAggregatorThread(TimelineMetricAggregator aggregator,
+ Configuration metricsConf) {
+ ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
+ if (!aggregator.isDisabled()) {
+ executorService.scheduleAtFixedRate(aggregator,
+ SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120)),
+ aggregator.getSleepIntervalMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index fce5a39..f8ec516 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
import java.io.File;
@@ -44,7 +42,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public abstract class AbstractTimelineAggregator implements TimelineMetricAggregator {
protected final PhoenixHBaseAccessor hBaseAccessor;
protected final Logger LOG;
- private Clock clock;
protected final long checkpointDelayMillis;
protected final Integer resultsetFetchSize;
protected Configuration metricsConf;
@@ -55,25 +52,20 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
protected String tableName;
protected String outputTableName;
protected Long nativeTimeRangeDelay;
+ protected Long lastAggregatedEndTime = -1l;
+
// Explicitly name aggregators for logging needs
private final String aggregatorName;
AbstractTimelineAggregator(String aggregatorName,
PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf, Clock clk) {
+ Configuration metricsConf) {
this.aggregatorName = aggregatorName;
this.hBaseAccessor = hBaseAccessor;
this.metricsConf = metricsConf;
this.checkpointDelayMillis = SECONDS.toMillis(metricsConf.getInt(AGGREGATOR_CHECKPOINT_DELAY, 120));
this.resultsetFetchSize = metricsConf.getInt(RESULTSET_FETCH_SIZE, 2000);
this.LOG = LoggerFactory.getLogger(aggregatorName);
- this.clock = clk;
- }
-
- AbstractTimelineAggregator(String aggregatorName,
- PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- this(aggregatorName, hBaseAccessor, metricsConf, new SystemClock());
}
public AbstractTimelineAggregator(String aggregatorName,
@@ -100,84 +92,68 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
public void run() {
LOG.info("Started Timeline aggregator thread @ " + new Date());
Long SLEEP_INTERVAL = getSleepIntervalMillis();
-
- while (true) {
- long sleepTime = runOnce(SLEEP_INTERVAL);
-
- try {
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- LOG.info("Sleep interrupted, continuing with aggregation.");
- }
- }
+ runOnce(SLEEP_INTERVAL);
+ this.lastAggregatedEndTime = this.lastAggregatedEndTime + SLEEP_INTERVAL;
}
/**
* Access relaxed for tests
*/
- public long runOnce(Long SLEEP_INTERVAL) {
- long currentTime = clock.getTime();
- long lastCheckPointTime = readLastCheckpointSavingOnFirstRun(currentTime);
- long sleepTime = SLEEP_INTERVAL;
+ public void runOnce(Long SLEEP_INTERVAL) {
+ long lastCheckPointTime = readLastCheckpointSavingOnFirstRun();
if (lastCheckPointTime != -1) {
LOG.info("Last check point time: " + lastCheckPointTime + ", lagBy: "
- + ((clock.getTime() - lastCheckPointTime) / 1000)
+ + ((lastAggregatedEndTime - lastCheckPointTime) / 1000)
+ " seconds.");
- long startTime = clock.getTime();
boolean success = doWork(lastCheckPointTime, lastCheckPointTime + SLEEP_INTERVAL);
- long executionTime = clock.getTime() - startTime;
- long delta = SLEEP_INTERVAL - executionTime;
-
- if (delta > 0) {
- // Sleep for (configured sleep - time to execute task)
- sleepTime = delta;
- } else {
- // No sleep because last run took too long to execute
- LOG.info("Aggregator execution took too long, " +
- "cancelling sleep. executionTime = " + executionTime);
- sleepTime = 1;
- }
-
- LOG.debug("Aggregator sleep interval = " + sleepTime);
if (success) {
try {
- // Comment to bug fix:
- // cannot just save lastCheckPointTime + SLEEP_INTERVAL,
- // it has to be verified so it is not a time in the future
- // checkpoint says what was aggregated, and there is no way
- // the future metrics were aggregated!
- saveCheckPoint(Math.min(currentTime, lastCheckPointTime +
- SLEEP_INTERVAL));
+ saveCheckPoint(lastCheckPointTime + SLEEP_INTERVAL);
} catch (IOException io) {
LOG.warn("Error saving checkpoint, restarting aggregation at " +
"previous checkpoint.");
}
}
}
-
- return sleepTime;
}
- private long readLastCheckpointSavingOnFirstRun(long currentTime) {
+ private long readLastCheckpointSavingOnFirstRun() {
long lastCheckPointTime = -1;
try {
lastCheckPointTime = readCheckPoint();
+ LOG.info("Last Checkpoint read : " + new Date(lastCheckPointTime));
+
+ if (lastAggregatedEndTime == -1l) {
+ lastAggregatedEndTime = getRoundedAggregateTimeMillis(getSleepIntervalMillis());
+ }
+
if (isLastCheckPointTooOld(lastCheckPointTime)) {
LOG.warn("Last Checkpoint is too old, discarding last checkpoint. " +
"lastCheckPointTime = " + new Date(lastCheckPointTime));
lastCheckPointTime = -1;
}
+
+ if (lastCheckPointTime > 0) {
+ lastCheckPointTime = getRoundedCheckPointTimeMillis(lastCheckPointTime, getSleepIntervalMillis());
+ LOG.info("Rounded off checkpoint : " + new Date(lastCheckPointTime));
+ }
+
+ if (isLastCheckPointTooYoung(lastCheckPointTime)) {
+ LOG.info("Last checkpoint too recent for aggregation. Sleeping for 1 cycle.");
+ lastCheckPointTime = -1;
+ }
+
if (lastCheckPointTime == -1) {
// Assuming first run, save checkpoint and sleep.
- // Set checkpoint to 2 minutes in the past to allow the
+ // Set checkpoint to rounded time in the past to allow the
// agents/collectors to catch up
LOG.info("Saving checkpoint time on first run. " +
- new Date((currentTime - checkpointDelayMillis)));
- saveCheckPoint(currentTime - checkpointDelayMillis);
+ new Date((lastAggregatedEndTime)));
+ saveCheckPoint(lastAggregatedEndTime);
}
} catch (IOException io) {
LOG.warn("Unable to write last checkpoint time. Resuming sleep.", io);
@@ -189,8 +165,12 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
// first checkpoint is saved checkpointDelayMillis in the past,
// so here we also need to take it into account
return checkpoint != -1 &&
- ((clock.getTime() - checkpoint - checkpointDelayMillis) >
- getCheckpointCutOffIntervalMillis());
+ ((lastAggregatedEndTime - checkpoint) > getCheckpointCutOffIntervalMillis());
+ }
+
+ private boolean isLastCheckPointTooYoung(long checkpoint) {
+ return checkpoint != -1 &&
+ ((lastAggregatedEndTime <= checkpoint));
}
protected long readCheckPoint() {
@@ -227,7 +207,6 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
* @param startTime Sample start time
* @param endTime Sample end time
*/
- @Override
public boolean doWork(long startTime, long endTime) {
LOG.info("Start aggregation cycle @ " + new Date() + ", " +
"startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
@@ -292,10 +271,14 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
protected abstract void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException;
- protected Long getSleepIntervalMillis() {
+ public Long getSleepIntervalMillis() {
return sleepIntervalMillis;
}
+ public void setSleepIntervalMillis(Long sleepIntervalMillis) {
+ this.sleepIntervalMillis = sleepIntervalMillis;
+ }
+
protected Integer getCheckpointCutOffMultiplier() {
return checkpointCutOffMultiplier;
}
@@ -311,4 +294,22 @@ public abstract class AbstractTimelineAggregator implements TimelineMetricAggreg
protected String getCheckpointLocation() {
return checkpointLocation;
}
+
+ protected void setLastAggregatedEndTime(long lastAggregatedEndTime) {
+ this.lastAggregatedEndTime = lastAggregatedEndTime;
+ }
+
+ protected long getLastAggregatedEndTime() {
+ return lastAggregatedEndTime;
+ }
+
+ public static long getRoundedCheckPointTimeMillis(long referenceTime, long aggregatorPeriod) {
+ return referenceTime - (referenceTime % aggregatorPeriod);
+ }
+
+ public static long getRoundedAggregateTimeMillis(long aggregatorPeriod) {
+ long currentTime = System.currentTimeMillis();
+ return currentTime - (currentTime % aggregatorPeriod);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
index 3c30a6f..b7d9110 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineClusterMetric.java
@@ -94,4 +94,8 @@ public class TimelineClusterMetric {
", timestamp=" + timestamp +
'}';
}
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
index 96be48d..295db0e 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAggregator.java
@@ -31,4 +31,11 @@ public interface TimelineMetricAggregator extends Runnable {
* @return true/false
*/
public boolean isDisabled();
-}
+
+ /**
+ * Return aggregator Interval
+ * @return Interval in Millis
+ */
+ public Long getSleepIntervalMillis();
+
+ }
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 1c1c4b6..b7609c0 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
@@ -81,13 +79,13 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
- Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs);
+ Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
hBaseAccessor.saveClusterTimeAggregateRecords(hostAggregateMap, outputTableName);
}
- private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+ private Map<TimelineClusterMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
throws IOException, SQLException {
TimelineClusterMetric existingMetric = null;
@@ -106,6 +104,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
if (existingMetric == null) {
// First row
existingMetric = currentMetric;
+ currentMetric.setTimestamp(endTime);
hostAggregate = new MetricHostAggregate();
hostAggregateMap.put(currentMetric, hostAggregate);
}
@@ -117,6 +116,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
} else {
// Switched over to a new metric - save existing
hostAggregate = new MetricHostAggregate();
+ currentMetric.setTimestamp(endTime);
updateAggregatesFromHost(hostAggregate, currentHostAggregate);
hostAggregateMap.put(currentMetric, hostAggregate);
existingMetric = currentMetric;
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index 1f9b2ec..e8e16a7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -237,12 +237,12 @@ public class TimelineMetricClusterAggregatorSecond extends AbstractTimelineAggre
}
/**
- * Return beginning of the time slice into which the metric fits.
+ * Return end of the time slice into which the metric fits.
*/
private Long getSliceTimeForMetric(List<Long[]> timeSlices, Long timestamp) {
for (Long[] timeSlice : timeSlices) {
- if (timestamp >= timeSlice[0] && timestamp < timeSlice[1]) {
- return timeSlice[0];
+ if (timestamp > timeSlice[0] && timestamp <= timeSlice[1]) {
+ return timeSlice[1];
}
}
return -1l;
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index e0fa26e..a976faf 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -54,7 +54,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
@Override
protected void aggregate(ResultSet rs, long startTime, long endTime) throws IOException, SQLException {
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs);
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap = aggregateMetricsFromResultSet(rs, endTime);
LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
hBaseAccessor.saveHostAggregateRecords(hostAggregateMap, outputTableName);
@@ -78,7 +78,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
return condition;
}
- private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs)
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet(ResultSet rs, long endTime)
throws IOException, SQLException {
TimelineMetric existingMetric = null;
MetricHostAggregate hostAggregate = null;
@@ -94,6 +94,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
if (existingMetric == null) {
// First row
existingMetric = currentMetric;
+ currentMetric.setTimestamp(endTime);
hostAggregate = new MetricHostAggregate();
hostAggregateMap.put(currentMetric, hostAggregate);
}
@@ -103,6 +104,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
hostAggregate.updateAggregates(currentHostAggregate);
} else {
// Switched over to a new metric - save existing - create new aggregate
+ currentMetric.setTimestamp(endTime);
hostAggregate = new MetricHostAggregate();
hostAggregate.updateAggregates(currentHostAggregate);
hostAggregateMap.put(currentMetric, hostAggregate);
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
index 5257412..6991b74 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricClusterAggregator.java
@@ -72,7 +72,7 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
condition.setStatement(String.format(GET_AGGREGATED_APP_METRIC_GROUPBY_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
- outputTableName, aggregateColumnName, tableName,
+ outputTableName, endTime, aggregateColumnName, tableName,
startTime, endTime));
if (LOG.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
index 1c46642..7ce03c1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/v2/TimelineMetricHostAggregator.java
@@ -65,7 +65,7 @@ public class TimelineMetricHostAggregator extends AbstractTimelineAggregator {
condition.setStatement(String.format(GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL,
PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, nativeTimeRangeDelay),
- outputTableName, tableName, startTime, endTime));
+ outputTableName, endTime, tableName, startTime, endTime));
if (LOG.isDebugEnabled()) {
LOG.debug("Condition: " + condition.toString());
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index 2fc6c34..e67a5b8 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -259,7 +259,7 @@ public class PhoenixTransactSQL {
public static final String GET_AGGREGATED_HOST_METRIC_GROUPBY_SQL = "UPSERT %s " +
"INTO %s (METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
"METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
- "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, MAX(SERVER_TIME), UNITS, " +
+ "SELECT METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, %s AS SERVER_TIME, UNITS, " +
"SUM(METRIC_SUM), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
"FROM %s WHERE SERVER_TIME >= %s AND SERVER_TIME < %s " +
"GROUP BY METRIC_NAME, HOSTNAME, APP_ID, INSTANCE_ID, UNITS";
@@ -271,7 +271,7 @@ public class PhoenixTransactSQL {
public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT %s " +
"INTO %s (METRIC_NAME, APP_ID, INSTANCE_ID, SERVER_TIME, UNITS, " +
"METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT METRIC_NAME, APP_ID, " +
- "INSTANCE_ID, MAX(SERVER_TIME), UNITS, SUM(METRIC_SUM), SUM(%s), " +
+ "INSTANCE_ID, %s AS SERVER_TIME, UNITS, SUM(METRIC_SUM), SUM(%s), " +
"MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE SERVER_TIME >= %s AND " +
"SERVER_TIME < %s GROUP BY METRIC_NAME, APP_ID, INSTANCE_ID, UNITS";
http://git-wip-us.apache.org/repos/asf/ambari/blob/bda06cfb/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
index 2b29469..8f7320b 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregatorTest.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AbstractTimelineAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
-import org.apache.hadoop.yarn.util.Clock;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@@ -35,7 +32,6 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
public class AbstractTimelineAggregatorTest {
private AbstractTimelineAggregator agg;
- TestClock clock = new TestClock();
AtomicLong startTimeInDoWork;
AtomicLong endTimeInDoWork;
@@ -47,7 +43,7 @@ public class AbstractTimelineAggregatorTest {
@Before
public void setUp() throws Exception {
- sleepIntervalMillis = 30000l;
+ sleepIntervalMillis = 2*2*30000l; //2 minutes
checkpointCutOffMultiplier = 2;
Configuration metricsConf = new Configuration();
@@ -59,7 +55,7 @@ public class AbstractTimelineAggregatorTest {
checkPoint = new AtomicLong(-1);
actualRuns = 0;
- agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf, clock) {
+ agg = new AbstractTimelineAggregator("TimelineAggregatorTest", null, metricsConf) {
@Override
public boolean doWork(long startTime, long endTime) {
startTimeInDoWork.set(startTime);
@@ -81,7 +77,7 @@ public class AbstractTimelineAggregatorTest {
}
@Override
- protected Long getSleepIntervalMillis() {
+ public Long getSleepIntervalMillis() {
return sleepIntervalMillis;
}
@@ -110,167 +106,67 @@ public class AbstractTimelineAggregatorTest {
}
};
-
}
@Test
public void testDoWorkOnZeroDelay() throws Exception {
- // starting at time 0;
- clock.setTime(0);
+ long currentTime = System.currentTimeMillis();
+ long roundedOffAggregatorTime = AbstractTimelineAggregator.getRoundedAggregateTimeMillis(currentTime);
- long sleep = agg.runOnce(sleepIntervalMillis);
+ //Test first run of aggregator with no checkpoint
+ agg.setLastAggregatedEndTime(roundedOffAggregatorTime);
+ agg.runOnce(sleepIntervalMillis);
assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
- assertEquals(0, checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
+ assertEquals(roundedOffAggregatorTime, checkPoint.get());
assertEquals("Do not aggregate on first run", 0, actualRuns);
- // exactly one sleepInterval
- clock.setTime(clock.getTime() + sleepIntervalMillis);
- sleep = agg.runOnce(sleepIntervalMillis);
- assertEquals("startTime", clock.getTime() -
- sleepIntervalMillis,
- startTimeInDoWork.get());
- assertEquals("endTime", clock.getTime(),
- endTimeInDoWork.get());
- assertEquals(clock.getTime(), checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
- assertEquals(1, actualRuns);
-
- // exactly one sleepInterval
- clock.setTime(clock.getTime() + sleepIntervalMillis);
- sleep = agg.runOnce(sleepIntervalMillis);
- assertEquals("startTime", clock.getTime() -
- sleepIntervalMillis,
- startTimeInDoWork.get());
- assertEquals("endTime", clock.getTime(),
- endTimeInDoWork.get());
- assertEquals(clock.getTime(), checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
- assertEquals(2, actualRuns);
-
- // checkpointCutOffMultiplier x sleepInterval - should pass,
- // it will aggregate only first part of the whole 2x interval
- // and sleep as usual (don't we need to skip some sleep?)
- //
- // effectively checkpoint will be one interval in the past,
- // so next run will
- clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
- sleepIntervalMillis));
- sleep = agg.runOnce(sleepIntervalMillis);
- assertEquals("startTime after 2xinterval", clock.getTime() -
- (checkpointCutOffMultiplier * sleepIntervalMillis),
- startTimeInDoWork.get());
- assertEquals("endTime after 2xinterval", clock.getTime() -
- sleepIntervalMillis,
- endTimeInDoWork.get());
- assertEquals("checkpoint after 2xinterval", clock.getTime() -
- sleepIntervalMillis, checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
- assertEquals(3, actualRuns);
-
- // exactly one sleepInterval after one that lagged by one whole interval,
- // so it will do the previous one... and sleep as usual
- // no way to keep up
- clock.setTime(clock.getTime() + sleepIntervalMillis);
- sleep = agg.runOnce(sleepIntervalMillis);
- assertEquals("startTime ", clock.getTime() -
- (checkpointCutOffMultiplier * sleepIntervalMillis),
- startTimeInDoWork.get());
- assertEquals("endTime ", clock.getTime() -
- sleepIntervalMillis,
- endTimeInDoWork.get());
- assertEquals("checkpoint ", clock.getTime() - sleepIntervalMillis,
- checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
- assertEquals(4, actualRuns);
-
-
- // checkpointCutOffMultiplier x sleepInterval - in normal state should pass,
- // but the clock lags too much, so this will not execute aggregation
- // just update checkpoint to currentTime
- clock.setTime(clock.getTime() + (checkpointCutOffMultiplier *
- sleepIntervalMillis));
- sleep = agg.runOnce(sleepIntervalMillis);
- assertEquals(4, actualRuns);
- assertEquals("checkpoint after too much lag is reset to " +
- "current clock time",
- clock.getTime(), checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
- }
-
- @Test
- public void testDoWorkOnInterruptedRuns() throws Exception {
- // start at some non-zero arbitrarily selected time;
- int startingTime = 10000;
-
- // 1.
- clock.setTime(startingTime);
- long timeOfFirstStep = clock.getTime();
- long sleep = agg.runOnce(sleepIntervalMillis);
+ //Test first run with Too Old checkpoint
+ checkPoint.set(currentTime - 5*60*1000); //Old checkpoint
+ agg.runOnce(sleepIntervalMillis);
assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
- assertEquals("do not aggregate on first run", 0, actualRuns);
- assertEquals("first checkpoint set on current time", timeOfFirstStep,
- checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
-
- // 2.
- // the doWork was fast, and sleep was interrupted (e.g. restart)
- // Q: do we want to aggregate just part of the system? maybe we should
- // sleep up to next cycle start!!
- clock.setTime(timeOfFirstStep + 1);
- long timeOfSecondStep = clock.getTime();
- sleep = agg.runOnce(sleepIntervalMillis);
- assertEquals("startTime should be on previous checkpoint since it did not" +
- " run yet",
- timeOfFirstStep, startTimeInDoWork.get());
-
- assertEquals("endTime can be start + interval",
- startingTime + sleepIntervalMillis,
- endTimeInDoWork.get());
- assertEquals("should aggregate", 1, actualRuns);
- assertEquals("checkpoint here should be set to min(endTime,currentTime), " +
- "it is currentTime in our scenario",
- timeOfSecondStep, checkPoint.get());
-
- assertEquals(sleep, sleepIntervalMillis);
-
- //3.
- // and again not a full sleep passed, so only small part was aggregated
- clock.setTime(startingTime + 2);
- long timeOfThirdStep = clock.getTime();
+ assertEquals(roundedOffAggregatorTime, checkPoint.get());
+ assertEquals("Do not aggregate on first run", 0, actualRuns);
- sleep = agg.runOnce(sleepIntervalMillis);
- // startTime and endTime are both be in the future, makes no sens,
- // query will not work!!
- assertEquals("startTime should be previous checkpoint",
- timeOfSecondStep, startTimeInDoWork.get());
+ //Test first run with too "recent" checkpoint
+ currentTime = System.currentTimeMillis();
+ checkPoint.set(currentTime - 30000);
+ agg.setLastAggregatedEndTime(-1l);
+ agg.setSleepIntervalMillis(sleepIntervalMillis);
+ agg.runOnce(sleepIntervalMillis);
+ assertEquals("startTime should be zero", 0, startTimeInDoWork.get());
+ assertEquals("endTime should be zero", 0, endTimeInDoWork.get());
+ assertEquals(agg.getLastAggregatedEndTime(), checkPoint.get());
+ assertEquals("Do not aggregate on first run", 0, actualRuns);
- assertEquals("endTime can be start + interval",
- timeOfSecondStep + sleepIntervalMillis,
- endTimeInDoWork.get());
- assertEquals("should aggregate", 2, actualRuns);
- assertEquals("checkpoint here should be set to min(endTime,currentTime), " +
- "it is currentTime in our scenario",
- timeOfThirdStep,
+ //Test first run with perfect checkpoint (sleepIntervalMillis back)
+ long checkPointTime = roundedOffAggregatorTime - sleepIntervalMillis;
+ long expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+ checkPoint.set(checkPointTime);
+ agg.setLastAggregatedEndTime(-1l);
+ agg.runOnce(sleepIntervalMillis);
+ assertEquals("startTime should the lower rounded time of the checkpoint time",
+ expectedCheckPoint, startTimeInDoWork.get());
+ assertEquals("endTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
+ expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
+ assertEquals(expectedCheckPoint + sleepIntervalMillis,
checkPoint.get());
- assertEquals(sleep, sleepIntervalMillis);
-
- }
-
- private static class TestClock implements Clock {
-
- private long time;
-
- public void setTime(long time) {
- this.time = time;
- }
+ assertEquals("Aggregate on first run", 1, actualRuns);
+
+ //Test edge case for checkpoint (2 x sleepIntervalMillis)
+ checkPointTime = roundedOffAggregatorTime - 2*sleepIntervalMillis;
+ expectedCheckPoint = AbstractTimelineAggregator.getRoundedCheckPointTimeMillis(checkPointTime, sleepIntervalMillis);
+ checkPoint.set(checkPointTime);
+ agg.runOnce(sleepIntervalMillis);
+ assertEquals("startTime should the lower rounded time of the checkpoint time",
+ expectedCheckPoint, startTimeInDoWork.get());
+ assertEquals("startTime should the lower rounded time of the checkpoint time + sleepIntervalMillis",
+ expectedCheckPoint + sleepIntervalMillis, endTimeInDoWork.get());
+ assertEquals(expectedCheckPoint + sleepIntervalMillis,
+ checkPoint.get());
+ assertEquals("Do not aggregate on first run", 2, actualRuns);
- @Override
- public long getTime() {
- return time;
- }
- }
+ }
}
\ No newline at end of file