You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/25 18:29:37 UTC
[2/2] ambari git commit: AMBARI-7680. Implement the Metric Collector
using ATS. Unit tests.
AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1d817954
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1d817954
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1d817954
Branch: refs/heads/branch-metrics-dev
Commit: 1d8179543a8b35ce1d27962f44494efa75acf9bc
Parents: 3b877ac
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Nov 25 09:29:17 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Nov 25 09:29:17 2014 -0800
----------------------------------------------------------------------
.../pom.xml | 26 +-
.../timeline/AbstractTimelineAggregator.java | 255 +++----------
.../timeline/DefaultPhoenixDataSource.java | 1 +
.../timeline/HBaseTimelineMetricStore.java | 24 +-
.../metrics/timeline/MetricAggregate.java | 110 ++++++
.../timeline/MetricClusterAggregate.java | 74 ++++
.../metrics/timeline/MetricHostAggregate.java | 81 ++++
.../metrics/timeline/PhoenixHBaseAccessor.java | 34 +-
.../metrics/timeline/TimelineClusterMetric.java | 97 +++++
.../timeline/TimelineMetricAggregator.java | 147 ++++++++
.../TimelineMetricAggregatorFactory.java | 89 +++++
.../TimelineMetricAggregatorHourly.java | 198 ----------
.../TimelineMetricAggregatorMinute.java | 181 ---------
.../TimelineMetricClusterAggregator.java | 235 ++++--------
.../TimelineMetricClusterAggregatorHourly.java | 98 ++---
.../timeline/TimelineMetricConfiguration.java | 3 +
.../TestApplicationHistoryServer.java | 31 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 141 ++++---
.../AbstractPhoenixConnectionlessTest.java | 113 ++++++
.../metrics/timeline/ITClusterAggregator.java | 376 +++++++++++++++++++
.../metrics/timeline/ITMetricAggregator.java | 298 +++++++++++++++
.../metrics/timeline/TestClusterAggregator.java | 275 --------------
.../metrics/timeline/TestClusterSuite.java | 32 ++
.../metrics/timeline/TestHBaseAccessor.java | 332 ----------------
.../timeline/TestMetricHostAggregate.java | 19 +-
.../timeline/TestPhoenixTransactSQL.java | 2 +-
.../src/test/resources/hbase-default.xml | 36 ++
.../src/test/resources/log4j.properties | 1 +
.../src/test/resources/logging.properties | 3 +
29 files changed, 1756 insertions(+), 1556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
index 7efdb6b..ae2872d 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -214,7 +214,18 @@
</mappings>
</configuration>
</plugin>
-
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>always</forkMode>
+ <systemProperties>
+ <property>
+ <name>java.util.logging.config.file</name>
+ <value>src/test/resources/logging.properties</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
</plugins>
</build>
@@ -470,6 +481,19 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>0.98.4-hadoop2</version>
+ <scope>test</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index 43ec648..b3c1af9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
import java.io.File;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Date;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -42,15 +40,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
public abstract class AbstractTimelineAggregator implements Runnable {
protected final PhoenixHBaseAccessor hBaseAccessor;
private final Log LOG;
- private static final ObjectMapper mapper;
protected final long checkpointDelayMillis;
protected final Integer resultsetFetchSize;
protected Configuration metricsConf;
- static {
- mapper = new ObjectMapper();
- }
-
public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf) {
this.hBaseAccessor = hBaseAccessor;
@@ -162,204 +155,78 @@ public abstract class AbstractTimelineAggregator implements Runnable {
FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
}
- // TODO: Abstract out doWork implementation for cluster and host levels
- protected abstract boolean doWork(long startTime, long endTime);
-
- protected abstract Long getSleepIntervalMillis();
-
- protected abstract Integer getCheckpointCutOffMultiplier();
-
- protected Long getCheckpointCutOffIntervalMillis() {
- return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
- }
-
- protected abstract boolean isDisabled();
-
- protected abstract String getCheckpointLocation();
-
- @JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
- @JsonSubTypes.Type(value = MetricHostAggregate.class)})
- @InterfaceAudience.Public
- @InterfaceStability.Unstable
- public static class MetricAggregate {
- protected Double sum = 0.0;
- protected Double deviation;
- protected Double max = Double.MIN_VALUE;
- protected Double min = Double.MAX_VALUE;
-
- public MetricAggregate() {
- }
+ /**
+ * Read metrics written during the time interval and save the sum and total
+ * in the aggregate table.
+ *
+ * @param startTime Sample start time
+ * @param endTime Sample end time
+ */
+ protected boolean doWork(long startTime, long endTime) {
+ LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+ "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
- protected MetricAggregate(Double sum, Double deviation, Double max,
- Double min) {
- this.sum = sum;
- this.deviation = deviation;
- this.max = max;
- this.min = min;
- }
+ boolean success = true;
+ PhoenixTransactSQL.Condition condition =
+ prepareMetricQueryCondition(startTime, endTime);
- void updateSum(Double sum) {
- this.sum += sum;
- }
+ Connection conn = null;
+ PreparedStatement stmt = null;
- void updateMax(Double max) {
- if (max > this.max) {
- this.max = max;
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+ LOG.debug("Query issued @: " + new Date());
+ ResultSet rs = stmt.executeQuery();
+ LOG.debug("Query returned @: " + new Date());
+
+ aggregate(rs, startTime, endTime);
+ LOG.info("End aggregation cycle @ " + new Date());
+
+ } catch (SQLException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } catch (IOException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
}
- }
-
- void updateMin(Double min) {
- if (min < this.min) {
- this.min = min;
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
}
}
- @JsonProperty("sum")
- Double getSum() {
- return sum;
- }
-
- @JsonProperty("deviation")
- Double getDeviation() {
- return deviation;
- }
-
- @JsonProperty("max")
- Double getMax() {
- return max;
- }
-
- @JsonProperty("min")
- Double getMin() {
- return min;
- }
-
- public void setSum(Double sum) {
- this.sum = sum;
- }
-
- public void setDeviation(Double deviation) {
- this.deviation = deviation;
- }
-
- public void setMax(Double max) {
- this.max = max;
- }
-
- public void setMin(Double min) {
- this.min = min;
- }
-
- public String toJSON() throws IOException {
- return mapper.writeValueAsString(this);
- }
+ LOG.info("End aggregation cycle @ " + new Date());
+ return success;
}
- public static class MetricClusterAggregate extends MetricAggregate {
- private int numberOfHosts;
-
- @JsonCreator
- public MetricClusterAggregate() {
- }
+ protected abstract PhoenixTransactSQL.Condition
+ prepareMetricQueryCondition(long startTime, long endTime);
- MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfHosts = numberOfHosts;
- }
+ protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
+ throws IOException, SQLException;
- @JsonProperty("numberOfHosts")
- int getNumberOfHosts() {
- return numberOfHosts;
- }
-
- void updateNumberOfHosts(int count) {
- this.numberOfHosts += count;
- }
-
- public void setNumberOfHosts(int numberOfHosts) {
- this.numberOfHosts = numberOfHosts;
- }
+ protected abstract Long getSleepIntervalMillis();
- /**
- * Find and update min, max and avg for a minute
- */
- void updateAggregates(MetricClusterAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfHosts(hostAggregate.getNumberOfHosts());
- }
+ protected abstract Integer getCheckpointCutOffMultiplier();
- @Override
- public String toString() {
-// MetricClusterAggregate
- return "MetricAggregate{" +
- "sum=" + sum +
- ", numberOfHosts=" + numberOfHosts +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
+ protected Long getCheckpointCutOffIntervalMillis() {
+ return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
}
- /**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
- public static class MetricHostAggregate extends MetricAggregate {
-
- private long numberOfSamples = 0;
-
- @JsonCreator
- public MetricHostAggregate() {
- super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
- }
-
- public MetricHostAggregate(Double sum, int numberOfSamples,
- Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfSamples = numberOfSamples;
- }
-
- @JsonProperty("numberOfSamples")
- long getNumberOfSamples() {
- return numberOfSamples == 0 ? 1 : numberOfSamples;
- }
-
- void updateNumberOfSamples(long count) {
- this.numberOfSamples += count;
- }
-
- public void setNumberOfSamples(long numberOfSamples) {
- this.numberOfSamples = numberOfSamples;
- }
-
- public double getAvg() {
- return sum / numberOfSamples;
- }
+ protected abstract boolean isDisabled();
- /**
- * Find and update min, max and avg for a minute
- */
- void updateAggregates(MetricHostAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfSamples(hostAggregate.getNumberOfSamples());
- }
+ protected abstract String getCheckpointLocation();
- @Override
- public String toString() {
- return "MetricHostAggregate{" +
- "sum=" + sum +
- ", numberOfSamples=" + numberOfSamples +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
index c20dd14..679ee36 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -69,6 +69,7 @@ public class DefaultPhoenixDataSource implements ConnectionProvider {
LOG.debug("Metric store connection url: " + url);
try {
// TODO: Exception is swallowed, it should be thrown - discuss it
+
connection = DriverManager.getConnection(url);
} catch (SQLException e) {
LOG.warn("Unable to connect to HBase store using Phoenix.", e);
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index d2b96ec..a3eb731 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -24,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
import java.io.IOException;
import java.net.URL;
import java.sql.SQLException;
@@ -31,9 +33,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
public class HBaseTimelineMetricStore extends AbstractService
implements TimelineMetricStore {
@@ -82,7 +88,7 @@ public class HBaseTimelineMetricStore extends AbstractService
Configuration metricsConf) {
hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
hBaseAccessor.initMetricSchema();
-
+//...BUG...
// Start the cluster aggregator
TimelineMetricClusterAggregator minuteClusterAggregator =
new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
@@ -100,16 +106,18 @@ public class HBaseTimelineMetricStore extends AbstractService
}
// Start the 5 minute aggregator
- TimelineMetricAggregatorMinute minuteHostAggregator =
- new TimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+ (hBaseAccessor, metricsConf);
if (!minuteHostAggregator.isDisabled()) {
Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
minuteAggregatorThread.start();
}
// Start hourly host aggregator
- TimelineMetricAggregatorHourly hourlyHostAggregator =
- new TimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+ TimelineMetricAggregator hourlyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+ (hBaseAccessor, metricsConf);
if (!hourlyHostAggregator.isDisabled()) {
Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
aggregatorHourlyThread.start();
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ protected Double sum = 0.0;
+ protected Double deviation;
+ protected Double max = Double.MIN_VALUE;
+ protected Double min = Double.MAX_VALUE;
+
+ public MetricAggregate() {
+ }
+
+ MetricAggregate(Double sum, Double deviation, Double max,
+ Double min) {
+ this.sum = sum;
+ this.deviation = deviation;
+ this.max = max;
+ this.min = min;
+ }
+
+ void updateSum(Double sum) {
+ this.sum += sum;
+ }
+
+ void updateMax(Double max) {
+ if (max > this.max) {
+ this.max = max;
+ }
+ }
+
+ void updateMin(Double min) {
+ if (min < this.min) {
+ this.min = min;
+ }
+ }
+
+ @JsonProperty("sum")
+ Double getSum() {
+ return sum;
+ }
+
+ @JsonProperty("deviation")
+ Double getDeviation() {
+ return deviation;
+ }
+
+ @JsonProperty("max")
+ Double getMax() {
+ return max;
+ }
+
+ @JsonProperty("min")
+ Double getMin() {
+ return min;
+ }
+
+ public void setSum(Double sum) {
+ this.sum = sum;
+ }
+
+ public void setDeviation(Double deviation) {
+ this.deviation = deviation;
+ }
+
+ public void setMax(Double max) {
+ this.max = max;
+ }
+
+ public void setMin(Double min) {
+ this.min = min;
+ }
+
+ public String toJSON() throws IOException {
+ return mapper.writeValueAsString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+ private int numberOfHosts;
+
+ @JsonCreator
+ public MetricClusterAggregate() {
+ }
+
+ MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @JsonProperty("numberOfHosts")
+ int getNumberOfHosts() {
+ return numberOfHosts;
+ }
+
+ void updateNumberOfHosts(int count) {
+ this.numberOfHosts += count;
+ }
+
+ public void setNumberOfHosts(int numberOfHosts) {
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricClusterAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ public String toString() {
+// MetricClusterAggregate
+ return "MetricAggregate{" +
+ "sum=" + sum +
+ ", numberOfHosts=" + numberOfHosts +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+ private long numberOfSamples = 0;
+
+ @JsonCreator
+ public MetricHostAggregate() {
+ super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+ }
+
+ public MetricHostAggregate(Double sum, int numberOfSamples,
+ Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ @JsonProperty("numberOfSamples")
+ long getNumberOfSamples() {
+ return numberOfSamples == 0 ? 1 : numberOfSamples;
+ }
+
+ void updateNumberOfSamples(long count) {
+ this.numberOfSamples += count;
+ }
+
+ public void setNumberOfSamples(long numberOfSamples) {
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ public double getAvg() {
+ return sum / numberOfSamples;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricHostAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricHostAggregate{" +
+ "sum=" + sum +
+ ", numberOfSamples=" + numberOfSamples +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 0851d07..41eb30e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,41 +28,16 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
/**
* Provides a facade over the Phoenix API to access HBase schema
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private long timestamp;
+ private String type;
+
+ TimelineClusterMetric(String metricName, String appId, String instanceId,
+ long timestamp, String type) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.timestamp = timestamp;
+ this.type = type;
+ }
+
+ String getMetricName() {
+ return metricName;
+ }
+
+ String getAppId() {
+ return appId;
+ }
+
+ String getInstanceId() {
+ return instanceId;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+
+ String getType() { return type; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+ return false;
+ if (!metricName.equals(that.metricName)) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineClusterMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineClusterMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..eaa1ab9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricAggregator.class);
+
+ private final String checkpointLocation;
+ private final Long sleepIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private final String hostAggregatorDisabledParam;
+ private final String inputTableName;
+ private final String outputTableName;
+
+ public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String inputTableName,
+ String outputTableName) {
+ super(hBaseAccessor, metricsConf);
+ this.checkpointLocation = checkpointLocation;
+ this.sleepIntervalMillis = sleepIntervalMillis;
+ this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+ this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+ this.inputTableName = inputTableName;
+ this.outputTableName = outputTableName;
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws IOException, SQLException {
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+ outputTableName);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime,
+ long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ inputTableName));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+ (ResultSet rs) throws IOException, SQLException {
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing - create new aggregate
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+ return hostAggregateMap;
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..d0dafeb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+ private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-checkpoint";
+ private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_CHECKPOINT_FILE);
+ long sleepInterval = metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300000l); // 5 mins
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepInterval,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName);
+ }
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ long sleepInterval = metricsConf.getLong
+ (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepInterval,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
deleted file mode 100644
index 16f5ab9..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.PhoenixTransactSQL.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog
- (TimelineMetricAggregatorHourly.class);
- private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
- "timeline-metrics-host-aggregator-hourly-checkpoint";
- private final String checkpointLocation;
- private final Long sleepIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
-
- public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
-
- super(hBaseAccessor, metricsConf);
-
- String checkpointDir = metricsConf.get(
- TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
- checkpointLocation = FilenameUtils.concat(checkpointDir,
- MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-
- sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
- checkpointCutOffMultiplier =
- metricsConf.getInt(HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date());
-
- boolean success = true;
- Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
-
- ResultSet rs = stmt.executeQuery();
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- aggregateMetricsFromResultSet(rs);
-
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
- hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
- METRICS_AGGREGATE_HOURLY_TABLE_NAME);
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
-
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
- }
-
- private Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_AGGREGATE_MINUTE_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
- return condition;
- }
-
- private Map<TimelineMetric, MetricHostAggregate>
- aggregateMetricsFromResultSet(ResultSet rs) throws SQLException, IOException {
- TimelineMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- hostAggregate.updateAggregates(currentHostAggregate);
-
- } else {
- // Switched over to a new metric - save existing
- hostAggregate = new MetricHostAggregate();
- hostAggregate.updateAggregates(currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
- }
- return hostAggregateMap;
- }
-
- @Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
-
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
- }
-
- @Override
- protected boolean isDisabled() {
- return metricsConf.getBoolean(HOST_AGGREGATOR_HOUR_DISABLED, false);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
deleted file mode 100644
index ac9d12e..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class);
- private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
- "timeline-metrics-host-aggregator-checkpoint";
- private final String checkpointLocation;
- private final Long sleepIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
-
- public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- super(hBaseAccessor, metricsConf);
-
- String checkpointDir = metricsConf.get(
- TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
- checkpointLocation = FilenameUtils.concat(checkpointDir,
- MINUTE_AGGREGATE_CHECKPOINT_FILE);
-
- sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
- checkpointCutOffMultiplier =
- metricsConf.getInt(HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
- boolean success = true;
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_RECORD_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
-
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
- LOG.debug("Query issued @: " + new Date());
- ResultSet rs = stmt.executeQuery();
- LOG.debug("Query returned @: " + new Date());
- TimelineMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
-
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- hostAggregate.updateAggregates(currentHostAggregate);
-
- } else {
- // Switched over to a new metric - create new aggregate
- hostAggregate = new MetricHostAggregate();
- hostAggregate.updateAggregates(currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
- }
-
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
- hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
- METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
-
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
- }
-
- @Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
-
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
- }
-
- @Override
- protected boolean isDisabled() {
- return metricsConf.getBoolean(HOST_AGGREGATOR_MINUTE_DISABLED, false);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
index c52451e..96de1a9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -24,27 +25,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -82,98 +74,79 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return checkpointLocation;
}
- /**
- * Read metrics written during the time interval and save the sum and total
- * in the aggregate table.
- *
- * @param startTime Sample start time
- * @param endTime Sample end time
- */
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
- boolean success = true;
- Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn;
- PreparedStatement stmt;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
- LOG.debug("Query issued @: " + new Date());
- ResultSet rs = stmt.executeQuery();
- LOG.debug("Query returned @: " + new Date());
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
- new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
- List<Long[]> timeSlices = new ArrayList<Long[]>();
- // Create time slices
- long sliceStartTime = startTime;
- while (sliceStartTime < endTime) {
- timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
- sliceStartTime += timeSliceIntervalMillis;
- }
-
- while (rs.next()) {
- TimelineMetric metric =
- PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
-
- Map<TimelineClusterMetric, Double> clusterMetrics =
- sliceFromTimelineMetric(metric, timeSlices);
-
- if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
- clusterMetrics.entrySet()) {
- TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
- MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
- Double avgValue = clusterMetricEntry.getValue();
-
- if (aggregate == null) {
- aggregate = new MetricClusterAggregate(avgValue, 1, null,
- avgValue, avgValue);
- aggregateClusterMetrics.put(clusterMetric, aggregate);
- } else {
- aggregate.updateSum(avgValue);
- aggregate.updateNumberOfHosts(1);
- aggregate.updateMax(avgValue);
- aggregate.updateMin(avgValue);
- }
- }
- }
- }
- LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
-
- hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-
- LOG.info("End aggregation cycle @ " + new Date());
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- }
-
- return success;
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
+ List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+ Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
}
- private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setFetchSize(resultsetFetchSize);
+ endTime, null, true);
condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
condition.setStatement(String.format(GET_METRIC_SQL,
METRICS_RECORD_TABLE_NAME));
condition.addOrderByColumn("METRIC_NAME");
condition.addOrderByColumn("APP_ID");
condition.addOrderByColumn("INSTANCE_ID");
condition.addOrderByColumn("SERVER_TIME");
-
return condition;
}
+ private List<Long[]> getTimeSlices(long startTime, long endTime) {
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+ sliceStartTime += timeSliceIntervalMillis;
+ }
+ return timeSlices;
+ }
+
+ private Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ // Create time slices
+
+ while (rs.next()) {
+ TimelineMetric metric =
+ PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+ Map<TimelineClusterMetric, Double> clusterMetrics =
+ sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+ Double avgValue = clusterMetricEntry.getValue();
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null,
+ avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ }
+ }
+ }
+ return aggregateClusterMetrics;
+ }
+
@Override
protected Long getSleepIntervalMillis() {
return sleepIntervalMillis;
@@ -239,82 +212,4 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return -1l;
}
- public static class TimelineClusterMetric {
- private String metricName;
- private String appId;
- private String instanceId;
- private long timestamp;
- private String type;
-
- TimelineClusterMetric(String metricName, String appId, String instanceId,
- long timestamp, String type) {
- this.metricName = metricName;
- this.appId = appId;
- this.instanceId = instanceId;
- this.timestamp = timestamp;
- this.type = type;
- }
-
- String getMetricName() {
- return metricName;
- }
-
- String getAppId() {
- return appId;
- }
-
- String getInstanceId() {
- return instanceId;
- }
-
- long getTimestamp() {
- return timestamp;
- }
-
- String getType() { return type; }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimelineClusterMetric that = (TimelineClusterMetric) o;
-
- if (timestamp != that.timestamp) return false;
- if (appId != null ? !appId.equals(that.appId) : that.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
- return false;
- if (!metricName.equals(that.metricName)) return false;
-
- return true;
- }
-
- public boolean equalsExceptTime(TimelineClusterMetric metric) {
- if (!metricName.equals(metric.metricName)) return false;
- if (!appId.equals(metric.appId)) return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
-
- return true;
- }
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (appId != null ? appId.hashCode() : 0);
- result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "TimelineClusterMetric{" +
- "metricName='" + metricName + '\'' +
- ", appId='" + appId + '\'' +
- ", instanceId='" + instanceId + '\'' +
- ", timestamp=" + timestamp +
- '}';
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
index e886b71..54d3fdd 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -24,11 +24,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -40,19 +37,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline.PhoenixTransactSQL.*;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
public class TimelineMetricClusterAggregatorHourly extends
AbstractTimelineAggregator {
@@ -77,7 +62,8 @@ public class TimelineMetricClusterAggregatorHourly extends
sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
(CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
- checkpointCutOffIntervalMillis = 7200000l;
+ checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
checkpointCutOffMultiplier = metricsConf.getInt
(CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
}
@@ -88,60 +74,31 @@ public class TimelineMetricClusterAggregatorHourly extends
}
@Override
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date
- (endTime));
-
- boolean success = true;
- Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
-
- ResultSet rs = stmt.executeQuery();
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
aggregateMetricsFromResultSet(rs);
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
- hBaseAccessor.saveClusterAggregateHourlyRecords(
- hostAggregateMap,
- METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ }
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime,
+ long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
}
- // should rewrite from host agg to cluster agg
- //
private Map<TimelineClusterMetric, MetricHostAggregate>
aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
@@ -189,19 +146,6 @@ public class TimelineMetricClusterAggregatorHourly extends
agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
}
- private Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new Condition
- (null, null, null, null, startTime, endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
- return condition;
- }
-
@Override
protected Long getSleepIntervalMillis() {
return sleepIntervalMillis;
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 6b19847..e022e5e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -86,6 +86,9 @@ public interface TimelineMetricConfiguration {
public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
+ public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
+ "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+
public static final String GLOBAL_RESULT_LIMIT =
"timeline.metrics.service.default.result.limit";