You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/11/25 18:29:36 UTC
[1/2] ambari git commit: AMBARI-7680. Implement the Metric Collector
using ATS. Unit tests.
Repository: ambari
Updated Branches:
refs/heads/branch-metrics-dev 3b877ac82 -> 1d8179543
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 71dbec2..3720852 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -24,19 +24,19 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixHBaseAccessor;
import org.apache.zookeeper.ClientCnxn;
import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -46,22 +46,21 @@ import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
import static org.easymock.EasyMock.createNiceMock;
-import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.*;
import static org.easymock.EasyMock.expectLastCall;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
+import static org.junit.Assert.*;
+import static org.powermock.api.easymock.PowerMock.*;
import static org.powermock.api.support.membermodification.MemberMatcher.method;
-import static org.powermock.api.support.membermodification.MemberModifier.suppress;
+import static org.powermock.api.support.membermodification.MemberModifier
+ .suppress;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ PhoenixHBaseAccessor.class, UserGroupInformation.class,
- ClientCnxn.class })
+ ClientCnxn.class, DefaultPhoenixDataSource.class})
@PowerMockIgnore( {"javax.management.*"})
public class TestApplicationHistoryServer {
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index db39250..528f1b1 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -15,99 +15,96 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
-import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
-import org.apache.phoenix.jdbc.PhoenixTestDriver;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
-import org.apache.phoenix.util.TestUtil;
import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.PreparedStatement;
+import java.sql.*;
+import java.util.Map;
+import java.util.Properties;
-import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
-public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
+public abstract class AbstractMiniHbaseClusterTest extends BaseTest {
- protected static String getUrl() {
- return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
- }
+ protected static final long BATCH_SIZE = 3;
- protected static String getUrl(String tenantId) {
- return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = getDefaultProps();
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+ props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+ // Make a small batch size to test multiple calls to reserve sequences
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+ Long.toString(BATCH_SIZE));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
- protected static PhoenixTestDriver driver;
-
- private static void startServer(String url) throws Exception {
- assertNull(driver);
- // only load the test driver if we are testing locally - for integration tests, we want to
- // test on a wider scale
- if (PhoenixEmbeddedDriver.isTestUrl(url)) {
- driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
- assertTrue(DriverManager.getDriver(url) == driver);
- driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
- }
+ @AfterClass
+ public static void doTeardown() throws Exception {
+ dropNonSystemTables();
}
- protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
- if (driver == null) {
- driver = new PhoenixTestDriver(props);
- DriverManager.registerDriver(driver);
- }
- return driver;
+ @After
+ public void cleanUpAfterTest() throws Exception {
+ deletePriorTables(HConstants.LATEST_TIMESTAMP, getUrl());
}
- private String connUrl;
+ public static Map<String, String> getDefaultProps() {
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(5);
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ Boolean.FALSE.toString());
+ return props;
+ }
- @Before
- public void setup() throws Exception {
- connUrl = getUrl();
- startServer(connUrl);
+ protected Connection getConnection(String url) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ return conn;
}
+ /**
+ * A canary test. Will show if the infrastructure is set-up correctly.
+ */
@Test
- public void testStorageSystemInitialized() throws Exception {
- String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
- "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
- "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
-
- Connection conn = null;
- PreparedStatement stmt = null;
- try {
- conn = DriverManager.getConnection(connUrl);
- stmt = conn.prepareStatement(sampleDDL);
- stmt.execute();
- conn.commit();
- } finally {
- if (stmt != null) {
- stmt.close();
- }
- if (conn != null) {
- conn.close();
- }
- }
- }
+ public void testClusterOK() throws Exception {
+ Connection conn = getConnection(getUrl());
+ conn.setAutoCommit(true);
- @After
- public void tearDown() throws Exception {
- if (driver != null) {
- try {
- driver.close();
- } finally {
- PhoenixTestDriver phoenixTestDriver = driver;
- driver = null;
- DriverManager.deregisterDriver(phoenixTestDriver);
- }
- }
+ String sampleDDL = "CREATE TABLE TEST_METRICS " +
+ "(TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " +
+ "TTL=86400, COMPRESSION='NONE' ";
+
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(sampleDDL);
+ conn.commit();
+
+ ResultSet rs = stmt.executeQuery(
+ "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+ rs.next();
+ long l = rs.getLong(1);
+ assertThat(l).isGreaterThanOrEqualTo(0);
+
+ stmt.execute("DROP TABLE TEST_METRICS");
+ conn.close();
}
-}
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
new file mode 100644
index 0000000..e7f2d5c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractPhoenixConnectionlessTest extends BaseTest {
+
+ protected static String getUrl() {
+ return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+ }
+
+ protected static String getUrl(String tenantId) {
+ return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+ }
+
+ protected static PhoenixTestDriver driver;
+
+ private static void startServer(String url) throws Exception {
+ assertNull(driver);
+ // only load the test driver if we are testing locally - for integration tests, we want to
+ // test on a wider scale
+ if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+ driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+ assertTrue(DriverManager.getDriver(url) == driver);
+ driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ }
+ }
+
+ protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
+ if (driver == null) {
+ driver = new PhoenixTestDriver(props);
+ DriverManager.registerDriver(driver);
+ }
+ return driver;
+ }
+
+ private String connUrl;
+
+ @Before
+ public void setup() throws Exception {
+ connUrl = getUrl();
+ startServer(connUrl);
+ }
+
+ @Test
+ public void testStorageSystemInitialized() throws Exception {
+ String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
+ "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try {
+ conn = DriverManager.getConnection(connUrl);
+ stmt = conn.prepareStatement(sampleDDL);
+ stmt.execute();
+ conn.commit();
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (driver != null) {
+ try {
+ driver.close();
+ } finally {
+ PhoenixTestDriver phoenixTestDriver = driver;
+ driver = null;
+ DriverManager.deregisterDriver(phoenixTestDriver);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
new file mode 100644
index 0000000..7a8a7d8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+
+public class ITClusterAggregator extends AbstractMiniHbaseClusterTest {
+ private Connection conn;
+ private PhoenixHBaseAccessor hdb;
+
+ @Before
+ public void setUp() throws Exception {
+ hdb = createTestableHBaseAccessor();
+ // inits connection, starts mini cluster
+ conn = getConnection(getUrl());
+
+ hdb.initMetricSchema();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("delete from METRIC_AGGREGATE");
+ stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+ stmt.execute("delete from METRIC_RECORD");
+ stmt.execute("delete from METRIC_RECORD_HOURLY");
+ stmt.execute("delete from METRIC_RECORD_MINUTE");
+ conn.commit();
+
+ stmt.close();
+ conn.close();
+ }
+
+ @Test
+ public void testShouldAggregateClusterProperly() throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregator agg =
+ new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 2));
+ ctime += minute;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 1));
+
+ // WHEN
+ long endTime = ctime + minute;
+ boolean success = agg.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ int recordCount = 0;
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(3.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ }
+
+
+ @Test
+ public void testShouldAggregateDifferentMetricsOnClusterProperly()
+ throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregator agg =
+ new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+ // here we put some metrics tha will be aggregated
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_used", 1));
+
+ ctime += minute;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_used", 1));
+
+ // WHEN
+ long endTime = ctime + minute;
+ boolean success = agg.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ int recordCount = 0;
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(3.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else if ("disk_used".equals(currentMetric.getMetricName())) {
+ assertEquals(1, currentHostAggregate.getNumberOfHosts());
+ assertEquals(1.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(1.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ }
+
+
+ @Test
+ public void testShouldAggregateClusterOnHourProperly() throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregatorHourly agg =
+ new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+ // this time can be virtualized! or made independent from real clock
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> records =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ records.put(createEmptyTimelineMetric(ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+ hdb.saveClusterAggregateRecords(records);
+
+ // WHEN
+ agg.doWork(startTime, ctime + minute);
+
+ // THEN
+ ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+ int count = 0;
+ while (rs.next()) {
+ assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+ count++;
+ }
+
+ assertEquals("One hourly aggregated row expected ", 1, count);
+ }
+
+ @Test
+ public void testShouldAggregateDifferentMetricsOnHourProperly() throws
+ Exception {
+ // GIVEN
+ TimelineMetricClusterAggregatorHourly agg =
+ new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> records =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ hdb.saveClusterAggregateRecords(records);
+
+ // WHEN
+ agg.doWork(startTime, ctime + minute);
+
+ // THEN
+ ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+ int count = 0;
+ while (rs.next()) {
+ if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+ } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+ }
+
+ count++;
+ }
+
+ assertEquals("Two hourly aggregated row expected ", 2, count);
+ }
+
+ private ResultSet executeQuery(String query) throws SQLException {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ return stmt.executeQuery(query);
+ }
+
+ private TimelineClusterMetric createEmptyTimelineMetric(String name,
+ long startTime) {
+ TimelineClusterMetric metric = new TimelineClusterMetric(name,
+ "test_app", null, startTime, null);
+
+ return metric;
+ }
+
+ private TimelineClusterMetric createEmptyTimelineMetric(long startTime) {
+ return createEmptyTimelineMetric("disk_used", startTime);
+ }
+
+ private MetricHostAggregate
+ createMetricHostAggregate(double max, double min, int numberOfSamples,
+ double sum) {
+ MetricHostAggregate expectedAggregate =
+ new MetricHostAggregate();
+ expectedAggregate.setMax(max);
+ expectedAggregate.setMin(min);
+ expectedAggregate.setNumberOfSamples(numberOfSamples);
+ expectedAggregate.setSum(sum);
+
+ return expectedAggregate;
+ }
+
+ private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ Configuration metricsConf = new Configuration();
+ metricsConf.set(
+ TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+ return
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ metricsConf,
+ new ConnectionProvider() {
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
+ }
+
+ private TimelineMetrics prepareSingleTimelineMetric(long startTime,
+ String host,
+ String metricName,
+ double val) {
+ TimelineMetrics m = new TimelineMetrics();
+ m.setMetrics(Arrays.asList(
+ createTimelineMetric(startTime, metricName, host, val)));
+
+ return m;
+ }
+
+ private TimelineMetric createTimelineMetric(long startTime,
+ String metricName,
+ String host,
+ double val) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName(host);
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, val);
+ vals.put(startTime + 30000l, val);
+ vals.put(startTime + 45000l, val);
+ vals.put(startTime + 60000l, val);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
new file mode 100644
index 0000000..19e5f21
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITMetricAggregator extends AbstractMiniHbaseClusterTest {
+ private Connection conn;
+ private PhoenixHBaseAccessor hdb;
+
+ @Before
+ public void setUp() throws Exception {
+ hdb = createTestableHBaseAccessor();
+ // inits connection, starts mini cluster
+ conn = getConnection(getUrl());
+
+ hdb.initMetricSchema();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("delete from METRIC_AGGREGATE");
+ stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+ stmt.execute("delete from METRIC_RECORD");
+ stmt.execute("delete from METRIC_RECORD_HOURLY");
+ stmt.execute("delete from METRIC_RECORD_MINUTE");
+ conn.commit();
+
+ stmt.close();
+ conn.close();
+ }
+
+ @Test
+ public void testShouldInsertMetrics() throws Exception {
+ // GIVEN
+
+ // WHEN
+ long startTime = System.currentTimeMillis();
+ TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
+ hdb.insertMetricRecords(metricsSent);
+
+ Condition queryCondition = new Condition(null, "local", null, null,
+ startTime, startTime + (15 * 60 * 1000), null, false);
+ TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
+
+ // THEN
+ assertThat(recordRead.getMetrics()).hasSize(2)
+ .extracting("metricName")
+ .containsOnly("mem_free", "disk_free");
+
+ assertThat(metricsSent.getMetrics())
+ .usingElementComparator(TIME_IGNORING_COMPARATOR)
+ .containsExactlyElementsOf(recordRead.getMetrics());
+ }
+
+ @Test
+ public void testShouldAggregateMinuteProperly() throws Exception {
+ // GIVEN
+// TimelineMetricAggregatorMinute aggregatorMinute =
+// new TimelineMetricAggregatorMinute(hdb, new Configuration());
+ TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory
+ .createTimelineMetricAggregatorMinute(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+ // WHEN
+ long endTime = startTime + 1000 * 60 * 4;
+ boolean success = aggregatorMinute.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+ MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+ int count = 0;
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ count++;
+ } else if ("mem_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ count++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ assertEquals("Two aggregated entries expected", 2, count);
+ }
+
+ @Test
+ public void testShouldAggregateHourProperly() throws Exception {
+ // GIVEN
+// TimelineMetricAggregatorHourly aggregator =
+// new TimelineMetricAggregatorHourly(hdb, new Configuration());
+
+ TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory
+ .createTimelineMetricAggregatorHourly(hdb, new Configuration());
+ long startTime = System.currentTimeMillis();
+
+ MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+ Map<TimelineMetric, MetricHostAggregate>
+ aggMap = new HashMap<TimelineMetric,
+ MetricHostAggregate>();
+
+ int min_5 = 5 * 60 * 1000;
+ long ctime = startTime - min_5;
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+ hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+ //WHEN
+ long endTime = ctime + min_5;
+ boolean success = aggregator.doWork(startTime, endTime);
+ assertTrue(success);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if ("disk_used".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(12 * 15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ }
+ }
+ }
+
+ private TimelineMetric createEmptyTimelineMetric(long startTime) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName("disk_used");
+ metric.setAppId("test_app");
+ metric.setHostName("test_host");
+ metric.setTimestamp(startTime);
+
+ return metric;
+ }
+
+ private MetricHostAggregate
+ createMetricHostAggregate(double max, double min, int numberOfSamples,
+ double sum) {
+ MetricHostAggregate expectedAggregate =
+ new MetricHostAggregate();
+ expectedAggregate.setMax(max);
+ expectedAggregate.setMin(min);
+ expectedAggregate.setNumberOfSamples(numberOfSamples);
+ expectedAggregate.setSum(sum);
+
+ return expectedAggregate;
+ }
+
+ private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ Configuration metricsConf = new Configuration();
+ metricsConf.set(
+ TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+ return
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ metricsConf,
+ new ConnectionProvider() {
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
+ }
+
+ private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+ new Comparator<TimelineMetric>() {
+ @Override
+ public int compare(TimelineMetric o1, TimelineMetric o2) {
+ return o1.equalsExceptTime(o2) ? 0 : 1;
+ }
+ };
+
+ private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
+ TimelineMetrics metrics = new TimelineMetrics();
+ metrics.setMetrics(Arrays.asList(
+ createMetric(startTime, "disk_free", host),
+ createMetric(startTime, "mem_free", host)));
+
+ return metrics;
+ }
+
+ private TimelineMetric createMetric(long startTime,
+ String metricName,
+ String host) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName(host);
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, 0.0);
+ vals.put(startTime + 30000l, 0.0);
+ vals.put(startTime + 45000l, 1.0);
+ vals.put(startTime + 60000l, 2.0);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
deleted file mode 100644
index ac14ca3..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterAggregator.java
+++ /dev/null
@@ -1,275 +0,0 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.sql.*;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Map;
-
-import static junit.framework.Assert.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.AbstractTimelineAggregator.MetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.PhoenixTransactSQL.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class TestClusterAggregator {
- private static String MY_LOCAL_URL =
- "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
- private Connection conn;
- private PhoenixHBaseAccessor hdb;
-
-
- @Before
- public void setUp() throws Exception {
- hdb = createTestableHBaseAccessor();
- conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
-
- hdb.initMetricSchema();
- }
-
- @After
- public void tearDown() throws Exception {
- Connection conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
-
- stmt.execute("delete from METRIC_AGGREGATE");
- stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
- stmt.execute("delete from METRIC_RECORD");
- stmt.execute("delete from METRIC_RECORD_HOURLY");
- stmt.execute("delete from METRIC_RECORD_MINUTE");
- conn.commit();
-
- stmt.close();
- conn.close();
- }
-
- /**
- * A canary test.
- *
- * @throws Exception
- */
- @Test
- public void testClusterOK() throws Exception {
- Connection conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
- String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
- "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
- "DATA_BLOCK_ENCODING='FAST_DIFF', " +
- "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
-
- stmt.executeUpdate(sampleDDL);
- ResultSet rs = stmt.executeQuery(
- "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
-
- rs.next();
- long l = rs.getLong(1);
- assertThat(l).isGreaterThanOrEqualTo(0);
-
- stmt.execute("DROP TABLE TEST_METRICS");
- }
-
- @Test
- public void testShouldAggregateClusterProperly() throws Exception {
- // GIVEN
- TimelineMetricClusterAggregator agg =
- new TimelineMetricClusterAggregator(hdb, new Configuration());
-
- long startTime = System.currentTimeMillis();
- long ctime = startTime;
- long minute = 60 * 1000;
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", 1));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", 2));
- ctime += minute;
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1", 2));
- hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2", 1));
-
- // WHEN
- long endTime = ctime + minute;
- boolean success = agg.doWork(startTime, endTime);
-
- //THEN
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
-
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
- ResultSet rs = pstmt.executeQuery();
- MetricHostAggregate expectedAggregate =
- createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-
- int recordCount = 0;
- while (rs.next()) {
- TimelineClusterMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
- MetricClusterAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
-
- if ("disk_free".equals(currentMetric.getMetricName())) {
- assertEquals(2, currentHostAggregate.getNumberOfHosts());
- assertEquals(2.0, currentHostAggregate.getMax());
- assertEquals(1.0, currentHostAggregate.getMin());
- assertEquals(3.0, currentHostAggregate.getSum());
- recordCount++;
- } else {
- fail("Unexpected entry");
- }
- }
- }
-
- @Test
- public void testShouldAggregateClusterOnHourProperly() throws Exception {
- // GIVEN
- TimelineMetricClusterAggregatorHourly agg =
- new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
-
- // this time can be virtualized! or made independent from real clock
- long startTime = System.currentTimeMillis();
- long ctime = startTime;
- long minute = 60 * 1000;
-
- Map<TimelineClusterMetric, MetricClusterAggregate> records =
- new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
-
- records.put(createEmptyTimelineMetric(ctime),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
- records.put(createEmptyTimelineMetric(ctime += minute),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
- records.put(createEmptyTimelineMetric(ctime += minute),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
- records.put(createEmptyTimelineMetric(ctime += minute),
- new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
-
- hdb.saveClusterAggregateRecords(records);
-
- // WHEN
- agg.doWork(startTime, ctime + minute);
-
- // THEN
- ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
- int count = 0;
- while (rs.next()) {
- assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
- assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
- assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
- assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
- assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
- assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
- count++;
- }
-
- assertEquals("One hourly aggregated row expected ",1, count);
-
- System.out.println(rs);
-
-
- }
-
- private ResultSet executeQuery(String query) throws SQLException {
- Connection conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
- return stmt.executeQuery(query);
- }
-
- private TimelineClusterMetric createEmptyTimelineMetric(long startTime) {
- TimelineClusterMetric metric = new TimelineClusterMetric("disk_used",
- "test_app", null, startTime, null);
-
- return metric;
- }
-
- private MetricHostAggregate
- createMetricHostAggregate(double max, double min, int numberOfSamples,
- double sum) {
- MetricHostAggregate expectedAggregate =
- new MetricHostAggregate();
- expectedAggregate.setMax(max);
- expectedAggregate.setMin(min);
- expectedAggregate.setNumberOfSamples(numberOfSamples);
- expectedAggregate.setSum(sum);
-
- return expectedAggregate;
- }
-
- private PhoenixHBaseAccessor createTestableHBaseAccessor() {
- return
- new PhoenixHBaseAccessor(
- new Configuration(),
- new Configuration(),
- new ConnectionProvider() {
- @Override
- public Connection getConnection() {
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(getUrl());
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
- }
- return connection;
- }
- });
- }
-
- private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
- new Comparator<TimelineMetric>() {
- @Override
- public int compare(TimelineMetric o1, TimelineMetric o2) {
- return o1.equalsExceptTime(o2) ? 0 : 1;
- }
- };
-
- private TimelineMetrics prepareSingleTimelineMetric(long startTime,
- String host,
- double val) {
- TimelineMetrics m = new TimelineMetrics();
- m.setMetrics(Arrays.asList(
- createTimelineMetric(startTime, "disk_free", host, val)));
-
- return m;
- }
-
- private TimelineMetric createTimelineMetric(long startTime,
- String metricName,
- String host,
- double val) {
- TimelineMetric m = new TimelineMetric();
- m.setAppId("host");
- m.setHostName(host);
- m.setMetricName(metricName);
- m.setStartTime(startTime);
- Map<Long, Double> vals = new HashMap<Long, Double>();
- vals.put(startTime + 15000l, val);
- vals.put(startTime + 30000l, val);
- vals.put(startTime + 45000l, val);
- vals.put(startTime + 60000l, val);
-
- m.setMetricValues(vals);
-
- return m;
- }
-
-
- protected static String getUrl() {
- return MY_LOCAL_URL;
-// return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
- }
-
- private static Connection getConnection(String url) throws SQLException {
- return DriverManager.getConnection(url);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
new file mode 100644
index 0000000..6aa227b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class})
+public class TestClusterSuite {
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
deleted file mode 100644
index a10cfeb..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestHBaseAccessor.java
+++ /dev/null
@@ -1,332 +0,0 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.junit.*;
-
-import java.sql.*;
-import java.util.*;
-
-import static junit.framework.Assert.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.PhoenixTransactSQL.*;
-import static org.assertj.core.api.Assertions.assertThat;
-
-public class TestHBaseAccessor {
- private static String MY_LOCAL_URL =
- "jdbc:phoenix:c6503.ambari.apache.org:" + 2181 + ":/hbase";
- private Connection conn;
- private PhoenixHBaseAccessor hdb;
-
-
- @Before
- public void setUp() throws Exception {
- hdb = createTestableHBaseAccessor();
- conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
-
- hdb.initMetricSchema();
- }
-
- @After
- public void tearDown() throws Exception {
- Connection conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
-
- stmt.execute("delete from METRIC_AGGREGATE");
- stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
- stmt.execute("delete from METRIC_RECORD");
- stmt.execute("delete from METRIC_RECORD_HOURLY");
- stmt.execute("delete from METRIC_RECORD_MINUTE");
- conn.commit();
-
- stmt.close();
- conn.close();
- }
- /**
- * A canary test.
- *
- * @throws Exception
- */
- @Test
- public void testClusterOK() throws Exception {
- Connection conn = getConnection(getUrl());
- Statement stmt = conn.createStatement();
- String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
- "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
- "DATA_BLOCK_ENCODING='FAST_DIFF', " +
- "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
-
- stmt.executeUpdate(sampleDDL);
- ResultSet rs = stmt.executeQuery(
- "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
-
- rs.next();
- long l = rs.getLong(1);
- assertThat(l).isGreaterThanOrEqualTo(0);
-
- stmt.execute("DROP TABLE TEST_METRICS");
- }
-
- // @Test
- public void testShouldInsertMetrics() throws Exception {
- // GIVEN
-
- // WHEN
- long startTime = System.currentTimeMillis();
- TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
- hdb.insertMetricRecords(metricsSent);
-
- Condition queryCondition = new Condition(null, "local", null, null,
- startTime, startTime + (15 * 60 * 1000), null, false);
- TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
-
- // THEN
- assertThat(recordRead.getMetrics()).hasSize(2)
- .extracting("metricName")
- .containsOnly("mem_free", "disk_free");
-
- assertThat(metricsSent.getMetrics())
- .usingElementComparator(TIME_IGNORING_COMPARATOR)
- .containsExactlyElementsOf(recordRead.getMetrics());
- }
-
- // @Test
- public void testShouldAggregateMinuteProperly() throws Exception {
- // GIVEN
- TimelineMetricAggregatorMinute aggregatorMinute =
- new TimelineMetricAggregatorMinute(hdb, new Configuration());
-
- long startTime = System.currentTimeMillis();
- long ctime = startTime;
- long minute = 60 * 1000;
- hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
- hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
- hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
- hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
- hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
-
- // WHEN
- long endTime = startTime + 1000 * 60 * 4;
- boolean success = aggregatorMinute.doWork(startTime, endTime);
-
- //THEN
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_AGGREGATE_MINUTE_TABLE_NAME));
-
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
- ResultSet rs = pstmt.executeQuery();
- AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
- createMetricHostAggregate(2.0, 0.0, 20, 15.0);
-
- int count = 0;
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if ("disk_free".equals(currentMetric.getMetricName())) {
- assertEquals(2.0, currentHostAggregate.getMax());
- assertEquals(0.0, currentHostAggregate.getMin());
- assertEquals(20, currentHostAggregate.getNumberOfSamples());
- assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
- count++;
- } else if ("mem_free".equals(currentMetric.getMetricName())) {
- assertEquals(2.0, currentHostAggregate.getMax());
- assertEquals(0.0, currentHostAggregate.getMin());
- assertEquals(20, currentHostAggregate.getNumberOfSamples());
- assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
- count++;
- } else {
- fail("Unexpected entry");
- }
- }
- assertEquals("Two aggregated entries expected", 2, count);
- }
-
- // @Test
- public void testShouldAggregateHourProperly() throws Exception {
- // GIVEN
- TimelineMetricAggregatorHourly aggregator =
- new TimelineMetricAggregatorHourly(hdb, new Configuration());
- long startTime = System.currentTimeMillis();
-
- AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
- createMetricHostAggregate(2.0, 0.0, 20, 15.0);
- Map<TimelineMetric, AbstractTimelineAggregator.MetricHostAggregate>
- aggMap = new HashMap<TimelineMetric,
- AbstractTimelineAggregator.MetricHostAggregate>();
-
- int min_5 = 5 * 60 * 1000;
- long ctime = startTime - min_5;
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
- aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
-
- hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
- //WHEN
- long endTime = ctime + min_5;
- boolean success = aggregator.doWork(startTime, endTime);
- assertTrue(success);
-
- //THEN
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_AGGREGATE_HOURLY_TABLE_NAME));
-
- PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
- (conn, condition);
- ResultSet rs = pstmt.executeQuery();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- AbstractTimelineAggregator.MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if ("disk_used".equals(currentMetric.getMetricName())) {
- assertEquals(2.0, currentHostAggregate.getMax());
- assertEquals(0.0, currentHostAggregate.getMin());
- assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
- assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
- }
- }
- }
-
- private TimelineMetric createEmptyTimelineMetric(long startTime) {
- TimelineMetric metric = new TimelineMetric();
- metric.setMetricName("disk_used");
- metric.setAppId("test_app");
- metric.setHostName("test_host");
- metric.setTimestamp(startTime);
-
- return metric;
- }
-
- private AbstractTimelineAggregator.MetricHostAggregate
- createMetricHostAggregate(double max, double min, int numberOfSamples,
- double sum) {
- AbstractTimelineAggregator.MetricHostAggregate expectedAggregate =
- new AbstractTimelineAggregator.MetricHostAggregate();
- expectedAggregate.setMax(max);
- expectedAggregate.setMin(min);
- expectedAggregate.setNumberOfSamples(numberOfSamples);
- expectedAggregate.setSum(sum);
-
- return expectedAggregate;
- }
-
- private PhoenixHBaseAccessor createTestableHBaseAccessor() {
- return
- new PhoenixHBaseAccessor(
- new Configuration(),
- new Configuration(),
- new ConnectionProvider() {
- @Override
- public Connection getConnection() {
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(getUrl());
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
- }
- return connection;
- }
- });
- }
-
- private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
- new Comparator<TimelineMetric>() {
- @Override
- public int compare(TimelineMetric o1, TimelineMetric o2) {
- return o1.equalsExceptTime(o2) ? 0 : 1;
- }
- };
-
- private TimelineMetrics prepareSingleTimelineMetric(long startTime,
- String host,
- double val) {
- TimelineMetrics m = new TimelineMetrics();
- m.setMetrics(Arrays.asList(
- createTimelineMetric(startTime, "disk_free", host, val)));
-
- return m;
- }
-
- private TimelineMetric createTimelineMetric(long startTime,
- String metricName,
- String host,
- double val) {
- TimelineMetric m = new TimelineMetric();
- m.setAppId("host");
- m.setHostName(host);
- m.setMetricName(metricName);
- m.setStartTime(startTime);
- Map<Long, Double> vals = new HashMap<Long, Double>();
- vals.put(startTime + 15000l, val);
- vals.put(startTime + 30000l, val);
- vals.put(startTime + 45000l, val);
- vals.put(startTime + 60000l, val);
-
- m.setMetricValues(vals);
-
- return m;
- }
-
- private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
- TimelineMetrics metrics = new TimelineMetrics();
- metrics.setMetrics(Arrays.asList(
- createMetric(startTime, "disk_free", host),
- createMetric(startTime, "mem_free", host)));
-
- return metrics;
- }
-
- private TimelineMetric createMetric(long startTime,
- String metricName,
- String host) {
- TimelineMetric m = new TimelineMetric();
- m.setAppId("host");
- m.setHostName(host);
- m.setMetricName(metricName);
- m.setStartTime(startTime);
- Map<Long, Double> vals = new HashMap<Long, Double>();
- vals.put(startTime + 15000l, 0.0);
- vals.put(startTime + 30000l, 0.0);
- vals.put(startTime + 45000l, 1.0);
- vals.put(startTime + 60000l, 2.0);
-
- m.setMetricValues(vals);
-
- return m;
- }
-
- protected static String getUrl() {
- return MY_LOCAL_URL;
-// return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
- }
-
- private static Connection getConnection(String url) throws SQLException {
- return DriverManager.getConnection(url);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index aebbdb3..5d8ba96 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -1,10 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline;
import org.junit.Test;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.AbstractTimelineAggregator.MetricHostAggregate;
import static org.assertj.core.api.Assertions.assertThat;
public class TestMetricHostAggregate {
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
index b0f0153..758f5a9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-public class TestPhoenixTransactSQL extends AbstractMiniHBaseClusterTest {
+public class TestPhoenixTransactSQL {
@Test
public void testConditionClause() throws Exception {
Condition condition = new Condition(
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml
new file mode 100644
index 0000000..7502346
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/hbase-default.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+ <property>
+ <name>hbase.defaults.for.version.skip</name>
+ <value>true</value>
+ <description>
+ Set to true to skip the 'hbase.defaults.for.version' check.
+ Setting this to true can be useful in contexts other than
+ the other side of a maven generation; i.e. running in an
+ ide. You'll want to set this boolean to true to avoid
+ seeing the RuntimException complaint: "hbase-default.xml file
+ seems to be for and old version of HBase (@@@VERSION@@@), this
+ version is X.X.X-SNAPSHOT"
+ </description>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8520d48
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/log4j.properties
@@ -0,0 +1 @@
+log4j.rootLogger=OFF
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties
new file mode 100644
index 0000000..f5edede
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/test/resources/logging.properties
@@ -0,0 +1,3 @@
+.level = OFF
+java.util.logging.ConsoleHandler.level=OFF
+java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
[2/2] ambari git commit: AMBARI-7680. Implement the Metric Collector
using ATS. Unit tests.
Posted by sw...@apache.org.
AMBARI-7680. Implement the Metric Collector using ATS. Unit tests.
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1d817954
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1d817954
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1d817954
Branch: refs/heads/branch-metrics-dev
Commit: 1d8179543a8b35ce1d27962f44494efa75acf9bc
Parents: 3b877ac
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Tue Nov 25 09:29:17 2014 -0800
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Tue Nov 25 09:29:17 2014 -0800
----------------------------------------------------------------------
.../pom.xml | 26 +-
.../timeline/AbstractTimelineAggregator.java | 255 +++----------
.../timeline/DefaultPhoenixDataSource.java | 1 +
.../timeline/HBaseTimelineMetricStore.java | 24 +-
.../metrics/timeline/MetricAggregate.java | 110 ++++++
.../timeline/MetricClusterAggregate.java | 74 ++++
.../metrics/timeline/MetricHostAggregate.java | 81 ++++
.../metrics/timeline/PhoenixHBaseAccessor.java | 34 +-
.../metrics/timeline/TimelineClusterMetric.java | 97 +++++
.../timeline/TimelineMetricAggregator.java | 147 ++++++++
.../TimelineMetricAggregatorFactory.java | 89 +++++
.../TimelineMetricAggregatorHourly.java | 198 ----------
.../TimelineMetricAggregatorMinute.java | 181 ---------
.../TimelineMetricClusterAggregator.java | 235 ++++--------
.../TimelineMetricClusterAggregatorHourly.java | 98 ++---
.../timeline/TimelineMetricConfiguration.java | 3 +
.../TestApplicationHistoryServer.java | 31 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 141 ++++---
.../AbstractPhoenixConnectionlessTest.java | 113 ++++++
.../metrics/timeline/ITClusterAggregator.java | 376 +++++++++++++++++++
.../metrics/timeline/ITMetricAggregator.java | 298 +++++++++++++++
.../metrics/timeline/TestClusterAggregator.java | 275 --------------
.../metrics/timeline/TestClusterSuite.java | 32 ++
.../metrics/timeline/TestHBaseAccessor.java | 332 ----------------
.../timeline/TestMetricHostAggregate.java | 19 +-
.../timeline/TestPhoenixTransactSQL.java | 2 +-
.../src/test/resources/hbase-default.xml | 36 ++
.../src/test/resources/log4j.properties | 1 +
.../src/test/resources/logging.properties | 3 +
29 files changed, 1756 insertions(+), 1556 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
index 7efdb6b..ae2872d 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/pom.xml
@@ -214,7 +214,18 @@
</mappings>
</configuration>
</plugin>
-
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>always</forkMode>
+ <systemProperties>
+ <property>
+ <name>java.util.logging.config.file</name>
+ <value>src/test/resources/logging.properties</value>
+ </property>
+ </systemProperties>
+ </configuration>
+ </plugin>
</plugins>
</build>
@@ -470,6 +481,19 @@
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>0.98.4-hadoop2</version>
+ <scope>test</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jruby</groupId>
+ <artifactId>jruby-complete</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
index 43ec648..b3c1af9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractTimelineAggregator.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.annotate.JsonSubTypes;
-import org.codehaus.jackson.map.ObjectMapper;
import java.io.File;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.Date;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -42,15 +40,10 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
public abstract class AbstractTimelineAggregator implements Runnable {
protected final PhoenixHBaseAccessor hBaseAccessor;
private final Log LOG;
- private static final ObjectMapper mapper;
protected final long checkpointDelayMillis;
protected final Integer resultsetFetchSize;
protected Configuration metricsConf;
- static {
- mapper = new ObjectMapper();
- }
-
public AbstractTimelineAggregator(PhoenixHBaseAccessor hBaseAccessor,
Configuration metricsConf) {
this.hBaseAccessor = hBaseAccessor;
@@ -162,204 +155,78 @@ public abstract class AbstractTimelineAggregator implements Runnable {
FileUtils.writeStringToFile(checkpoint, String.valueOf(checkpointTime));
}
- // TODO: Abstract out doWork implementation for cluster and host levels
- protected abstract boolean doWork(long startTime, long endTime);
-
- protected abstract Long getSleepIntervalMillis();
-
- protected abstract Integer getCheckpointCutOffMultiplier();
-
- protected Long getCheckpointCutOffIntervalMillis() {
- return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
- }
-
- protected abstract boolean isDisabled();
-
- protected abstract String getCheckpointLocation();
-
- @JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
- @JsonSubTypes.Type(value = MetricHostAggregate.class)})
- @InterfaceAudience.Public
- @InterfaceStability.Unstable
- public static class MetricAggregate {
- protected Double sum = 0.0;
- protected Double deviation;
- protected Double max = Double.MIN_VALUE;
- protected Double min = Double.MAX_VALUE;
-
- public MetricAggregate() {
- }
+ /**
+ * Read metrics written during the time interval and save the sum and total
+ * in the aggregate table.
+ *
+ * @param startTime Sample start time
+ * @param endTime Sample end time
+ */
+ protected boolean doWork(long startTime, long endTime) {
+ LOG.info("Start aggregation cycle @ " + new Date() + ", " +
+ "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
- protected MetricAggregate(Double sum, Double deviation, Double max,
- Double min) {
- this.sum = sum;
- this.deviation = deviation;
- this.max = max;
- this.min = min;
- }
+ boolean success = true;
+ PhoenixTransactSQL.Condition condition =
+ prepareMetricQueryCondition(startTime, endTime);
- void updateSum(Double sum) {
- this.sum += sum;
- }
+ Connection conn = null;
+ PreparedStatement stmt = null;
- void updateMax(Double max) {
- if (max > this.max) {
- this.max = max;
+ try {
+ conn = hBaseAccessor.getConnection();
+ stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
+
+ LOG.debug("Query issued @: " + new Date());
+ ResultSet rs = stmt.executeQuery();
+ LOG.debug("Query returned @: " + new Date());
+
+ aggregate(rs, startTime, endTime);
+ LOG.info("End aggregation cycle @ " + new Date());
+
+ } catch (SQLException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } catch (IOException e) {
+ LOG.error("Exception during aggregating metrics.", e);
+ success = false;
+ } finally {
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ // Ignore
+ }
}
- }
-
- void updateMin(Double min) {
- if (min < this.min) {
- this.min = min;
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (SQLException sql) {
+ // Ignore
+ }
}
}
- @JsonProperty("sum")
- Double getSum() {
- return sum;
- }
-
- @JsonProperty("deviation")
- Double getDeviation() {
- return deviation;
- }
-
- @JsonProperty("max")
- Double getMax() {
- return max;
- }
-
- @JsonProperty("min")
- Double getMin() {
- return min;
- }
-
- public void setSum(Double sum) {
- this.sum = sum;
- }
-
- public void setDeviation(Double deviation) {
- this.deviation = deviation;
- }
-
- public void setMax(Double max) {
- this.max = max;
- }
-
- public void setMin(Double min) {
- this.min = min;
- }
-
- public String toJSON() throws IOException {
- return mapper.writeValueAsString(this);
- }
+ LOG.info("End aggregation cycle @ " + new Date());
+ return success;
}
- public static class MetricClusterAggregate extends MetricAggregate {
- private int numberOfHosts;
-
- @JsonCreator
- public MetricClusterAggregate() {
- }
+ protected abstract PhoenixTransactSQL.Condition
+ prepareMetricQueryCondition(long startTime, long endTime);
- MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfHosts = numberOfHosts;
- }
+ protected abstract void aggregate(ResultSet rs, long startTime, long endTime)
+ throws IOException, SQLException;
- @JsonProperty("numberOfHosts")
- int getNumberOfHosts() {
- return numberOfHosts;
- }
-
- void updateNumberOfHosts(int count) {
- this.numberOfHosts += count;
- }
-
- public void setNumberOfHosts(int numberOfHosts) {
- this.numberOfHosts = numberOfHosts;
- }
+ protected abstract Long getSleepIntervalMillis();
- /**
- * Find and update min, max and avg for a minute
- */
- void updateAggregates(MetricClusterAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfHosts(hostAggregate.getNumberOfHosts());
- }
+ protected abstract Integer getCheckpointCutOffMultiplier();
- @Override
- public String toString() {
-// MetricClusterAggregate
- return "MetricAggregate{" +
- "sum=" + sum +
- ", numberOfHosts=" + numberOfHosts +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
+ protected Long getCheckpointCutOffIntervalMillis() {
+ return getCheckpointCutOffMultiplier() * getSleepIntervalMillis();
}
- /**
- * Represents a collection of minute based aggregation of values for
- * resolution greater than a minute.
- */
- public static class MetricHostAggregate extends MetricAggregate {
-
- private long numberOfSamples = 0;
-
- @JsonCreator
- public MetricHostAggregate() {
- super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
- }
-
- public MetricHostAggregate(Double sum, int numberOfSamples,
- Double deviation,
- Double max, Double min) {
- super(sum, deviation, max, min);
- this.numberOfSamples = numberOfSamples;
- }
-
- @JsonProperty("numberOfSamples")
- long getNumberOfSamples() {
- return numberOfSamples == 0 ? 1 : numberOfSamples;
- }
-
- void updateNumberOfSamples(long count) {
- this.numberOfSamples += count;
- }
-
- public void setNumberOfSamples(long numberOfSamples) {
- this.numberOfSamples = numberOfSamples;
- }
-
- public double getAvg() {
- return sum / numberOfSamples;
- }
+ protected abstract boolean isDisabled();
- /**
- * Find and update min, max and avg for a minute
- */
- void updateAggregates(MetricHostAggregate hostAggregate) {
- updateMax(hostAggregate.getMax());
- updateMin(hostAggregate.getMin());
- updateSum(hostAggregate.getSum());
- updateNumberOfSamples(hostAggregate.getNumberOfSamples());
- }
+ protected abstract String getCheckpointLocation();
- @Override
- public String toString() {
- return "MetricHostAggregate{" +
- "sum=" + sum +
- ", numberOfSamples=" + numberOfSamples +
- ", deviation=" + deviation +
- ", max=" + max +
- ", min=" + min +
- '}';
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
index c20dd14..679ee36 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/DefaultPhoenixDataSource.java
@@ -69,6 +69,7 @@ public class DefaultPhoenixDataSource implements ConnectionProvider {
LOG.debug("Metric store connection url: " + url);
try {
// TODO: Exception is swallowed, it should be thrown - discuss it
+
connection = DriverManager.getConnection(url);
} catch (SQLException e) {
LOG.warn("Unable to connect to HBase store using Phoenix.", e);
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index d2b96ec..a3eb731 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -24,6 +25,7 @@ import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
import java.io.IOException;
import java.net.URL;
import java.sql.SQLException;
@@ -31,9 +33,13 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.HBASE_SITE_CONFIGURATION_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.METRICS_SITE_CONFIGURATION_FILE;
public class HBaseTimelineMetricStore extends AbstractService
implements TimelineMetricStore {
@@ -82,7 +88,7 @@ public class HBaseTimelineMetricStore extends AbstractService
Configuration metricsConf) {
hBaseAccessor = new PhoenixHBaseAccessor(hbaseConf, metricsConf);
hBaseAccessor.initMetricSchema();
-
+//...BUG...
// Start the cluster aggregator
TimelineMetricClusterAggregator minuteClusterAggregator =
new TimelineMetricClusterAggregator(hBaseAccessor, metricsConf);
@@ -100,16 +106,18 @@ public class HBaseTimelineMetricStore extends AbstractService
}
// Start the 5 minute aggregator
- TimelineMetricAggregatorMinute minuteHostAggregator =
- new TimelineMetricAggregatorMinute(hBaseAccessor, metricsConf);
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute
+ (hBaseAccessor, metricsConf);
if (!minuteHostAggregator.isDisabled()) {
Thread minuteAggregatorThread = new Thread(minuteHostAggregator);
minuteAggregatorThread.start();
}
// Start hourly host aggregator
- TimelineMetricAggregatorHourly hourlyHostAggregator =
- new TimelineMetricAggregatorHourly(hBaseAccessor, metricsConf);
+ TimelineMetricAggregator hourlyHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly
+ (hBaseAccessor, metricsConf);
if (!hourlyHostAggregator.isDisabled()) {
Thread aggregatorHourlyThread = new Thread(hourlyHostAggregator);
aggregatorHourlyThread.start();
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
new file mode 100644
index 0000000..61e15d7
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricAggregate.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.annotate.JsonSubTypes;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+*
+*/
+@JsonSubTypes({@JsonSubTypes.Type(value = MetricClusterAggregate.class),
+ @JsonSubTypes.Type(value = MetricHostAggregate.class)})
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class MetricAggregate {
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ protected Double sum = 0.0;
+ protected Double deviation;
+ protected Double max = Double.MIN_VALUE;
+ protected Double min = Double.MAX_VALUE;
+
+ public MetricAggregate() {
+ }
+
+ MetricAggregate(Double sum, Double deviation, Double max,
+ Double min) {
+ this.sum = sum;
+ this.deviation = deviation;
+ this.max = max;
+ this.min = min;
+ }
+
+ void updateSum(Double sum) {
+ this.sum += sum;
+ }
+
+ void updateMax(Double max) {
+ if (max > this.max) {
+ this.max = max;
+ }
+ }
+
+ void updateMin(Double min) {
+ if (min < this.min) {
+ this.min = min;
+ }
+ }
+
+ @JsonProperty("sum")
+ Double getSum() {
+ return sum;
+ }
+
+ @JsonProperty("deviation")
+ Double getDeviation() {
+ return deviation;
+ }
+
+ @JsonProperty("max")
+ Double getMax() {
+ return max;
+ }
+
+ @JsonProperty("min")
+ Double getMin() {
+ return min;
+ }
+
+ public void setSum(Double sum) {
+ this.sum = sum;
+ }
+
+ public void setDeviation(Double deviation) {
+ this.deviation = deviation;
+ }
+
+ public void setMax(Double max) {
+ this.max = max;
+ }
+
+ public void setMin(Double min) {
+ this.min = min;
+ }
+
+ public String toJSON() throws IOException {
+ return mapper.writeValueAsString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
new file mode 100644
index 0000000..c13c85f
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricClusterAggregate.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+*
+*/
+public class MetricClusterAggregate extends MetricAggregate {
+ private int numberOfHosts;
+
+ @JsonCreator
+ public MetricClusterAggregate() {
+ }
+
+ MetricClusterAggregate(Double sum, int numberOfHosts, Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ @JsonProperty("numberOfHosts")
+ int getNumberOfHosts() {
+ return numberOfHosts;
+ }
+
+ void updateNumberOfHosts(int count) {
+ this.numberOfHosts += count;
+ }
+
+ public void setNumberOfHosts(int numberOfHosts) {
+ this.numberOfHosts = numberOfHosts;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricClusterAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfHosts(hostAggregate.getNumberOfHosts());
+ }
+
+ @Override
+ public String toString() {
+// MetricClusterAggregate
+ return "MetricAggregate{" +
+ "sum=" + sum +
+ ", numberOfHosts=" + numberOfHosts +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
new file mode 100644
index 0000000..02cc207
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricHostAggregate.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Represents a collection of minute based aggregation of values for
+ * resolution greater than a minute.
+ */
+public class MetricHostAggregate extends MetricAggregate {
+
+ private long numberOfSamples = 0;
+
+ @JsonCreator
+ public MetricHostAggregate() {
+ super(0.0, 0.0, Double.MIN_VALUE, Double.MAX_VALUE);
+ }
+
+ public MetricHostAggregate(Double sum, int numberOfSamples,
+ Double deviation,
+ Double max, Double min) {
+ super(sum, deviation, max, min);
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ @JsonProperty("numberOfSamples")
+ long getNumberOfSamples() {
+ return numberOfSamples == 0 ? 1 : numberOfSamples;
+ }
+
+ void updateNumberOfSamples(long count) {
+ this.numberOfSamples += count;
+ }
+
+ public void setNumberOfSamples(long numberOfSamples) {
+ this.numberOfSamples = numberOfSamples;
+ }
+
+ public double getAvg() {
+ return sum / numberOfSamples;
+ }
+
+ /**
+ * Find and update min, max and avg for a minute
+ */
+ void updateAggregates(MetricHostAggregate hostAggregate) {
+ updateMax(hostAggregate.getMax());
+ updateMin(hostAggregate.getMin());
+ updateSum(hostAggregate.getSum());
+ updateNumberOfSamples(hostAggregate.getNumberOfSamples());
+ }
+
+ @Override
+ public String toString() {
+ return "MetricHostAggregate{" +
+ "sum=" + sum +
+ ", numberOfSamples=" + numberOfSamples +
+ ", deviation=" + deviation +
+ ", max=" + max +
+ ", min=" + min +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 0851d07..41eb30e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,41 +28,16 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.AbstractTimelineAggregator.MetricClusterAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractTimelineAggregator.MetricHostAggregate;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_AGGREGATE_MINUTE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.CREATE_METRICS_TABLE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_ENCODING;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.DEFAULT_TABLE_COMPRESSION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_AGGREGATE_RECORD_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_CLUSTER_AGGREGATE_TIME_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.UPSERT_METRICS_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HBASE_ENCODING_SCHEME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_HOUR_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_MINUTE_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.PRECISION_TABLE_TTL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.GLOBAL_RESULT_LIMIT;
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
/**
* Provides a facade over the Phoenix API to access HBase schema
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
new file mode 100644
index 0000000..d227993
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineClusterMetric.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+public class TimelineClusterMetric {
+ private String metricName;
+ private String appId;
+ private String instanceId;
+ private long timestamp;
+ private String type;
+
+ TimelineClusterMetric(String metricName, String appId, String instanceId,
+ long timestamp, String type) {
+ this.metricName = metricName;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ this.timestamp = timestamp;
+ this.type = type;
+ }
+
+ String getMetricName() {
+ return metricName;
+ }
+
+ String getAppId() {
+ return appId;
+ }
+
+ String getInstanceId() {
+ return instanceId;
+ }
+
+ long getTimestamp() {
+ return timestamp;
+ }
+
+ String getType() { return type; }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TimelineClusterMetric that = (TimelineClusterMetric) o;
+
+ if (timestamp != that.timestamp) return false;
+ if (appId != null ? !appId.equals(that.appId) : that.appId != null)
+ return false;
+ if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
+ return false;
+ if (!metricName.equals(that.metricName)) return false;
+
+ return true;
+ }
+
+ public boolean equalsExceptTime(TimelineClusterMetric metric) {
+ if (!metricName.equals(metric.metricName)) return false;
+ if (!appId.equals(metric.appId)) return false;
+ if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
+ return false;
+
+ return true;
+ }
+ @Override
+ public int hashCode() {
+ int result = metricName.hashCode();
+ result = 31 * result + (appId != null ? appId.hashCode() : 0);
+ result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
+ result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineClusterMetric{" +
+ "metricName='" + metricName + '\'' +
+ ", appId='" + appId + '\'' +
+ ", instanceId='" + instanceId + '\'' +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
new file mode 100644
index 0000000..eaa1ab9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregator.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+
+public class TimelineMetricAggregator extends AbstractTimelineAggregator {
+ private static final Log LOG = LogFactory.getLog
+ (TimelineMetricAggregator.class);
+
+ private final String checkpointLocation;
+ private final Long sleepIntervalMillis;
+ private final Integer checkpointCutOffMultiplier;
+ private final String hostAggregatorDisabledParam;
+ private final String inputTableName;
+ private final String outputTableName;
+
+ public TimelineMetricAggregator(PhoenixHBaseAccessor hBaseAccessor,
+ Configuration metricsConf,
+ String checkpointLocation,
+ Long sleepIntervalMillis,
+ Integer checkpointCutOffMultiplier,
+ String hostAggregatorDisabledParam,
+ String inputTableName,
+ String outputTableName) {
+ super(hBaseAccessor, metricsConf);
+ this.checkpointLocation = checkpointLocation;
+ this.sleepIntervalMillis = sleepIntervalMillis;
+ this.checkpointCutOffMultiplier = checkpointCutOffMultiplier;
+ this.hostAggregatorDisabledParam = hostAggregatorDisabledParam;
+ this.inputTableName = inputTableName;
+ this.outputTableName = outputTableName;
+ }
+
+ @Override
+ protected String getCheckpointLocation() {
+ return checkpointLocation;
+ }
+
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws IOException, SQLException {
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ aggregateMetricsFromResultSet(rs);
+
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
+ outputTableName);
+ }
+
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime,
+ long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ inputTableName));
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("HOSTNAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
+ }
+
+ private Map<TimelineMetric, MetricHostAggregate> aggregateMetricsFromResultSet
+ (ResultSet rs) throws IOException, SQLException {
+ TimelineMetric existingMetric = null;
+ MetricHostAggregate hostAggregate = null;
+ Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
+ new HashMap<TimelineMetric, MetricHostAggregate>();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if (existingMetric == null) {
+ // First row
+ existingMetric = currentMetric;
+ hostAggregate = new MetricHostAggregate();
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ }
+
+ if (existingMetric.equalsExceptTime(currentMetric)) {
+ // Recalculate totals with current metric
+ hostAggregate.updateAggregates(currentHostAggregate);
+
+ } else {
+ // Switched over to a new metric - save existing - create new aggregate
+ hostAggregate = new MetricHostAggregate();
+ hostAggregate.updateAggregates(currentHostAggregate);
+ hostAggregateMap.put(currentMetric, hostAggregate);
+ existingMetric = currentMetric;
+ }
+ }
+ return hostAggregateMap;
+ }
+
+ @Override
+ protected Long getSleepIntervalMillis() {
+ return sleepIntervalMillis;
+ }
+
+ @Override
+ protected Integer getCheckpointCutOffMultiplier() {
+ return checkpointCutOffMultiplier;
+ }
+
+ @Override
+ protected boolean isDisabled() {
+ return metricsConf.getBoolean(hostAggregatorDisabledParam, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
new file mode 100644
index 0000000..d0dafeb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorFactory.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
+
+/**
+ *
+ */
+public class TimelineMetricAggregatorFactory {
+ private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-checkpoint";
+ private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
+ "timeline-metrics-host-aggregator-hourly-checkpoint";
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorMinute
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_CHECKPOINT_FILE);
+ long sleepInterval = metricsConf.getLong
+ (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300000l); // 5 mins
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_MINUTE_DISABLED;
+
+ String inputTableName = METRICS_RECORD_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepInterval,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName);
+ }
+
+ public static TimelineMetricAggregator createTimelineMetricAggregatorHourly
+ (PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf) {
+
+ String checkpointDir = metricsConf.get(
+ TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
+ String checkpointLocation = FilenameUtils.concat(checkpointDir,
+ MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
+ long sleepInterval = metricsConf.getLong
+ (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600000l);
+ int checkpointCutOffMultiplier = metricsConf.getInt
+ (HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
+ String hostAggregatorDisabledParam = HOST_AGGREGATOR_HOUR_DISABLED;
+
+ String inputTableName = METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+ String outputTableName = METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+
+ return new TimelineMetricAggregator(hBaseAccessor, metricsConf,
+ checkpointLocation,
+ sleepInterval,
+ checkpointCutOffMultiplier,
+ hostAggregatorDisabledParam,
+ inputTableName,
+ outputTableName);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
deleted file mode 100644
index 16f5ab9..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorHourly.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.PhoenixTransactSQL.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorHourly extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog
- (TimelineMetricAggregatorHourly.class);
- private static final String MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE =
- "timeline-metrics-host-aggregator-hourly-checkpoint";
- private final String checkpointLocation;
- private final Long sleepIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
-
- public TimelineMetricAggregatorHourly(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
-
- super(hBaseAccessor, metricsConf);
-
- String checkpointDir = metricsConf.get(
- TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
- checkpointLocation = FilenameUtils.concat(checkpointDir,
- MINUTE_AGGREGATE_HOURLY_CHECKPOINT_FILE);
-
- sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (HOST_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
- checkpointCutOffMultiplier =
- metricsConf.getInt(HOST_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date());
-
- boolean success = true;
- Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
-
- ResultSet rs = stmt.executeQuery();
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- aggregateMetricsFromResultSet(rs);
-
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
- hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
- METRICS_AGGREGATE_HOURLY_TABLE_NAME);
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
-
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
- }
-
- private Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_AGGREGATE_MINUTE_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
- return condition;
- }
-
- private Map<TimelineMetric, MetricHostAggregate>
- aggregateMetricsFromResultSet(ResultSet rs) throws SQLException, IOException {
- TimelineMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- hostAggregate.updateAggregates(currentHostAggregate);
-
- } else {
- // Switched over to a new metric - save existing
- hostAggregate = new MetricHostAggregate();
- hostAggregate.updateAggregates(currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
- }
- return hostAggregateMap;
- }
-
- @Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
-
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
- }
-
- @Override
- protected boolean isDisabled() {
- return metricsConf.getBoolean(HOST_AGGREGATOR_HOUR_DISABLED, false);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
deleted file mode 100644
index ac9d12e..0000000
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricAggregatorMinute.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-
-import org.apache.commons.io.FilenameUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
-
-public class TimelineMetricAggregatorMinute extends AbstractTimelineAggregator {
- private static final Log LOG = LogFactory.getLog(TimelineMetricAggregatorMinute.class);
- private static final String MINUTE_AGGREGATE_CHECKPOINT_FILE =
- "timeline-metrics-host-aggregator-checkpoint";
- private final String checkpointLocation;
- private final Long sleepIntervalMillis;
- private final Integer checkpointCutOffMultiplier;
-
- public TimelineMetricAggregatorMinute(PhoenixHBaseAccessor hBaseAccessor,
- Configuration metricsConf) {
- super(hBaseAccessor, metricsConf);
-
- String checkpointDir = metricsConf.get(
- TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
-
- checkpointLocation = FilenameUtils.concat(checkpointDir,
- MINUTE_AGGREGATE_CHECKPOINT_FILE);
-
- sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
- (HOST_AGGREGATOR_MINUTE_SLEEP_INTERVAL, 300l)); // 5 mins
- checkpointCutOffMultiplier =
- metricsConf.getInt(HOST_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER, 3);
- }
-
- @Override
- protected String getCheckpointLocation() {
- return checkpointLocation;
- }
-
- @Override
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
- boolean success = true;
- Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
- METRICS_RECORD_TABLE_NAME));
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("HOSTNAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
-
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
- LOG.debug("Query issued @: " + new Date());
- ResultSet rs = stmt.executeQuery();
- LOG.debug("Query returned @: " + new Date());
- TimelineMetric existingMetric = null;
- MetricHostAggregate hostAggregate = null;
-
- Map<TimelineMetric, MetricHostAggregate> hostAggregateMap =
- new HashMap<TimelineMetric, MetricHostAggregate>();
-
- while (rs.next()) {
- TimelineMetric currentMetric =
- PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
- MetricHostAggregate currentHostAggregate =
- PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
-
- if (existingMetric == null) {
- // First row
- existingMetric = currentMetric;
- hostAggregate = new MetricHostAggregate();
- hostAggregateMap.put(currentMetric, hostAggregate);
- }
-
- if (existingMetric.equalsExceptTime(currentMetric)) {
- // Recalculate totals with current metric
- hostAggregate.updateAggregates(currentHostAggregate);
-
- } else {
- // Switched over to a new metric - create new aggregate
- hostAggregate = new MetricHostAggregate();
- hostAggregate.updateAggregates(currentHostAggregate);
- hostAggregateMap.put(currentMetric, hostAggregate);
- existingMetric = currentMetric;
- }
- }
-
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
- hBaseAccessor.saveHostAggregateRecords(hostAggregateMap,
- METRICS_AGGREGATE_MINUTE_TABLE_NAME);
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
-
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
- }
-
- @Override
- protected Long getSleepIntervalMillis() {
- return sleepIntervalMillis;
- }
-
- @Override
- protected Integer getCheckpointCutOffMultiplier() {
- return checkpointCutOffMultiplier;
- }
-
- @Override
- protected boolean isDisabled() {
- return metricsConf.getBoolean(HOST_AGGREGATOR_MINUTE_DISABLED, false);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
index c52451e..96de1a9 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregator.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -24,27 +25,18 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_SQL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.prepareGetMetricsSqlStmt;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_MINUTE_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.PhoenixTransactSQL.*;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline.TimelineMetricConfiguration.*;
/**
* Aggregates a metric across all hosts in the cluster. Reads metrics from
@@ -82,98 +74,79 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return checkpointLocation;
}
- /**
- * Read metrics written during the time interval and save the sum and total
- * in the aggregate table.
- *
- * @param startTime Sample start time
- * @param endTime Sample end time
- */
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
-
- boolean success = true;
- Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn;
- PreparedStatement stmt;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
- LOG.debug("Query issued @: " + new Date());
- ResultSet rs = stmt.executeQuery();
- LOG.debug("Query returned @: " + new Date());
- Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
- new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
- List<Long[]> timeSlices = new ArrayList<Long[]>();
- // Create time slices
- long sliceStartTime = startTime;
- while (sliceStartTime < endTime) {
- timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
- sliceStartTime += timeSliceIntervalMillis;
- }
-
- while (rs.next()) {
- TimelineMetric metric =
- PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
-
- Map<TimelineClusterMetric, Double> clusterMetrics =
- sliceFromTimelineMetric(metric, timeSlices);
-
- if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
- for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
- clusterMetrics.entrySet()) {
- TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
- MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
- Double avgValue = clusterMetricEntry.getValue();
-
- if (aggregate == null) {
- aggregate = new MetricClusterAggregate(avgValue, 1, null,
- avgValue, avgValue);
- aggregateClusterMetrics.put(clusterMetric, aggregate);
- } else {
- aggregate.updateSum(avgValue);
- aggregate.updateNumberOfHosts(1);
- aggregate.updateMax(avgValue);
- aggregate.updateMin(avgValue);
- }
- }
- }
- }
- LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
-
- hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
-
- LOG.info("End aggregation cycle @ " + new Date());
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- }
-
- return success;
+ @Override
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
+ List<Long[]> timeSlices = getTimeSlices(startTime, endTime);
+ Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateClusterMetrics = aggregateMetricsFromResultSet(rs, timeSlices);
+
+ LOG.info("Saving " + aggregateClusterMetrics.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateRecords(aggregateClusterMetrics);
}
- private Condition prepareMetricQueryCondition(long startTime, long endTime) {
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime, long endTime) {
Condition condition = new Condition(null, null, null, null, startTime,
- endTime, null, true);
- condition.setFetchSize(resultsetFetchSize);
+ endTime, null, true);
condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
condition.setStatement(String.format(GET_METRIC_SQL,
METRICS_RECORD_TABLE_NAME));
condition.addOrderByColumn("METRIC_NAME");
condition.addOrderByColumn("APP_ID");
condition.addOrderByColumn("INSTANCE_ID");
condition.addOrderByColumn("SERVER_TIME");
-
return condition;
}
+ private List<Long[]> getTimeSlices(long startTime, long endTime) {
+ List<Long[]> timeSlices = new ArrayList<Long[]>();
+ long sliceStartTime = startTime;
+ while (sliceStartTime < endTime) {
+ timeSlices.add(new Long[] { sliceStartTime, sliceStartTime + timeSliceIntervalMillis});
+ sliceStartTime += timeSliceIntervalMillis;
+ }
+ return timeSlices;
+ }
+
+ private Map<TimelineClusterMetric, MetricClusterAggregate>
+ aggregateMetricsFromResultSet(ResultSet rs, List<Long[]> timeSlices)
+ throws SQLException, IOException {
+ Map<TimelineClusterMetric, MetricClusterAggregate> aggregateClusterMetrics =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+ // Create time slices
+
+ while (rs.next()) {
+ TimelineMetric metric =
+ PhoenixHBaseAccessor.getTimelineMetricFromResultSet(rs);
+
+ Map<TimelineClusterMetric, Double> clusterMetrics =
+ sliceFromTimelineMetric(metric, timeSlices);
+
+ if (clusterMetrics != null && !clusterMetrics.isEmpty()) {
+ for (Map.Entry<TimelineClusterMetric, Double> clusterMetricEntry :
+ clusterMetrics.entrySet()) {
+ TimelineClusterMetric clusterMetric = clusterMetricEntry.getKey();
+ MetricClusterAggregate aggregate = aggregateClusterMetrics.get(clusterMetric);
+ Double avgValue = clusterMetricEntry.getValue();
+
+ if (aggregate == null) {
+ aggregate = new MetricClusterAggregate(avgValue, 1, null,
+ avgValue, avgValue);
+ aggregateClusterMetrics.put(clusterMetric, aggregate);
+ } else {
+ aggregate.updateSum(avgValue);
+ aggregate.updateNumberOfHosts(1);
+ aggregate.updateMax(avgValue);
+ aggregate.updateMin(avgValue);
+ }
+ }
+ }
+ }
+ return aggregateClusterMetrics;
+ }
+
@Override
protected Long getSleepIntervalMillis() {
return sleepIntervalMillis;
@@ -239,82 +212,4 @@ public class TimelineMetricClusterAggregator extends AbstractTimelineAggregator
return -1l;
}
- public static class TimelineClusterMetric {
- private String metricName;
- private String appId;
- private String instanceId;
- private long timestamp;
- private String type;
-
- TimelineClusterMetric(String metricName, String appId, String instanceId,
- long timestamp, String type) {
- this.metricName = metricName;
- this.appId = appId;
- this.instanceId = instanceId;
- this.timestamp = timestamp;
- this.type = type;
- }
-
- String getMetricName() {
- return metricName;
- }
-
- String getAppId() {
- return appId;
- }
-
- String getInstanceId() {
- return instanceId;
- }
-
- long getTimestamp() {
- return timestamp;
- }
-
- String getType() { return type; }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimelineClusterMetric that = (TimelineClusterMetric) o;
-
- if (timestamp != that.timestamp) return false;
- if (appId != null ? !appId.equals(that.appId) : that.appId != null)
- return false;
- if (instanceId != null ? !instanceId.equals(that.instanceId) : that.instanceId != null)
- return false;
- if (!metricName.equals(that.metricName)) return false;
-
- return true;
- }
-
- public boolean equalsExceptTime(TimelineClusterMetric metric) {
- if (!metricName.equals(metric.metricName)) return false;
- if (!appId.equals(metric.appId)) return false;
- if (instanceId != null ? !instanceId.equals(metric.instanceId) : metric.instanceId != null)
- return false;
-
- return true;
- }
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (appId != null ? appId.hashCode() : 0);
- result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
- result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
- return result;
- }
-
- @Override
- public String toString() {
- return "TimelineClusterMetric{" +
- "metricName='" + metricName + '\'' +
- ", appId='" + appId + '\'' +
- ", instanceId='" + instanceId + '\'' +
- ", timestamp=" + timestamp +
- '}';
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
index e886b71..54d3fdd 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricClusterAggregatorHourly.java
@@ -24,11 +24,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -40,19 +37,7 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline.PhoenixTransactSQL.*;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricClusterAggregator.TimelineClusterMetric;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline.TimelineMetricConfiguration.*;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration.DEFAULT_CHECKPOINT_LOCATION;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
- .timeline.TimelineMetricConfiguration
- .TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR;
public class TimelineMetricClusterAggregatorHourly extends
AbstractTimelineAggregator {
@@ -77,7 +62,8 @@ public class TimelineMetricClusterAggregatorHourly extends
sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
(CLUSTER_AGGREGATOR_HOUR_SLEEP_INTERVAL, 3600l));
- checkpointCutOffIntervalMillis = 7200000l;
+ checkpointCutOffIntervalMillis = SECONDS.toMillis(metricsConf.getLong
+ (CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL, 7200l));
checkpointCutOffMultiplier = metricsConf.getInt
(CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
}
@@ -88,60 +74,31 @@ public class TimelineMetricClusterAggregatorHourly extends
}
@Override
- protected boolean doWork(long startTime, long endTime) {
- LOG.info("Start aggregation cycle @ " + new Date() + ", " +
- "startTime = " + new Date(startTime) + ", endTime = " + new Date
- (endTime));
-
- boolean success = true;
- Condition condition = prepareMetricQueryCondition(startTime, endTime);
-
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try {
- conn = hBaseAccessor.getConnection();
- stmt = prepareGetMetricsSqlStmt(conn, condition);
-
- ResultSet rs = stmt.executeQuery();
+ protected void aggregate(ResultSet rs, long startTime, long endTime)
+ throws SQLException, IOException {
Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
aggregateMetricsFromResultSet(rs);
- LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
-
- hBaseAccessor.saveClusterAggregateHourlyRecords(
- hostAggregateMap,
- METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
-
- } catch (SQLException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } catch (IOException e) {
- LOG.error("Exception during aggregating metrics.", e);
- success = false;
- } finally {
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- // Ignore
- }
- }
- if (conn != null) {
- try {
- conn.close();
- } catch (SQLException sql) {
- // Ignore
- }
- }
- }
+ LOG.info("Saving " + hostAggregateMap.size() + " metric aggregates.");
+ hBaseAccessor.saveClusterAggregateHourlyRecords(hostAggregateMap,
+ METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ }
- LOG.info("End aggregation cycle @ " + new Date());
- return success;
+ @Override
+ protected Condition prepareMetricQueryCondition(long startTime,
+ long endTime) {
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setNoLimit();
+ condition.setFetchSize(resultsetFetchSize);
+ condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
+ condition.addOrderByColumn("METRIC_NAME");
+ condition.addOrderByColumn("APP_ID");
+ condition.addOrderByColumn("INSTANCE_ID");
+ condition.addOrderByColumn("SERVER_TIME");
+ return condition;
}
- // should rewrite from host agg to cluster agg
- //
private Map<TimelineClusterMetric, MetricHostAggregate>
aggregateMetricsFromResultSet(ResultSet rs) throws IOException, SQLException {
@@ -189,19 +146,6 @@ public class TimelineMetricClusterAggregatorHourly extends
agg.updateNumberOfSamples(currentClusterAggregate.getNumberOfHosts());
}
- private Condition prepareMetricQueryCondition(long startTime, long endTime) {
- Condition condition = new Condition
- (null, null, null, null, startTime, endTime, null, true);
- condition.setNoLimit();
- condition.setFetchSize(resultsetFetchSize);
- condition.setStatement(GET_CLUSTER_AGGREGATE_SQL);
- condition.addOrderByColumn("METRIC_NAME");
- condition.addOrderByColumn("APP_ID");
- condition.addOrderByColumn("INSTANCE_ID");
- condition.addOrderByColumn("SERVER_TIME");
- return condition;
- }
-
@Override
protected Long getSleepIntervalMillis() {
return sleepIntervalMillis;
http://git-wip-us.apache.org/repos/asf/ambari/blob/1d817954/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 6b19847..e022e5e 100644
--- a/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics/ambari-metrics-hadoop-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -86,6 +86,9 @@ public interface TimelineMetricConfiguration {
public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_MULTIPLIER =
"timeline.metrics.cluster.aggregator.hourly.checkpointCutOffMultiplier";
+ public static final String CLUSTER_AGGREGATOR_HOUR_CHECKPOINT_CUTOFF_INTERVAL =
+ "timeline.metrics.cluster.aggregator.hourly.checkpointCutOffInterval";
+
public static final String GLOBAL_RESULT_LIMIT =
"timeline.metrics.service.default.result.limit";