You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2014/12/02 18:28:26 UTC
[10/30] ambari git commit: AMBARI-5707. Replace Ganglia with high
performant and pluggable Metrics System. (swagle)
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java
new file mode 100644
index 0000000..4411be5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestRestMetricsSender.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .loadsimulator.net;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestRestMetricsSender {
+
+ @Test
+ public void testPushMetrics() throws Exception {
+ final UrlService svcMock = createStrictMock(UrlService.class);
+ final String payload = "test";
+ final String expectedResponse = "mockResponse";
+
+ expect(svcMock.send(anyString())).andReturn(expectedResponse);
+ svcMock.disconnect();
+ expectLastCall();
+
+ replay(svcMock);
+
+ RestMetricsSender sender = new RestMetricsSender("expectedHostName") {
+ @Override
+ protected UrlService getConnectedUrlService() throws IOException {
+ return svcMock;
+ }
+ };
+ String response = sender.pushMetrics(payload);
+
+ verify(svcMock);
+ assertEquals("", expectedResponse, response);
+ }
+
+ @Test
+ public void testPushMetricsFailed() throws Exception {
+ final UrlService svcMock = createStrictMock(UrlService.class);
+ final String payload = "test";
+ final String expectedResponse = "mockResponse";
+ RestMetricsSender sender = new RestMetricsSender("expectedHostName") {
+ @Override
+ protected UrlService getConnectedUrlService() throws IOException {
+ return svcMock;
+ }
+ };
+
+ expect(svcMock.send(anyString())).andThrow(new IOException());
+ svcMock.disconnect();
+ expectLastCall();
+
+ replay(svcMock);
+
+ String response = sender.pushMetrics(payload);
+
+ verify(svcMock);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java
new file mode 100644
index 0000000..7e29ae3
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/net/TestStdOutMetricsSender.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.net;
+
+
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+public class TestStdOutMetricsSender {
+
+ @Test
+ public void testPushMetrics() throws Exception {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream out = new PrintStream(baos);
+ StdOutMetricsSender sender = new StdOutMetricsSender("expectedHostName", out);
+ sender.pushMetrics("test");
+
+ System.out.println(baos.toString());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java
new file mode 100644
index 0000000..462aaf0
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestRandomMetricsProvider.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.util;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestRandomMetricsProvider {
+
+ @Test
+ public void testReturnSingle() {
+ double from = 5.25;
+ double to = 5.40;
+ RandomMetricsProvider provider = new RandomMetricsProvider(from, to);
+ double metric = provider.next();
+
+ assertTrue("Generated metric should be in range", from < metric && metric < to);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
new file mode 100644
index 0000000..dd513aa
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/loadsimulator/util/TestTimeStampProvider.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.loadsimulator.util;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class TestTimeStampProvider {
+
+ @Test
+ public void testReturnSingle() {
+ long startTime = 1411663170112L;
+ int timeStep = 5000;
+ TimeStampProvider tm = new TimeStampProvider(startTime, timeStep, 0);
+
+ long tStamp = tm.next();
+
+ assertEquals("First generated timestamp should match starttime", startTime, tStamp);
+ }
+
+ @Test
+ public void testReturnTstampsForSendInterval() throws Exception {
+ long startTime = 0;
+ int collectInterval = 5;
+ int sendInterval = 30;
+ TimeStampProvider tsp = new TimeStampProvider(startTime, collectInterval, sendInterval);
+
+ long[] timestamps = tsp.timestampsForNextInterval();
+
+ assertThat(timestamps)
+ .hasSize(6)
+ .containsOnly(0, 5, 10, 15, 20, 25);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
new file mode 100644
index 0000000..96b8a83
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
+
+ protected static final long BATCH_SIZE = 3;
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ Map<String, String> props = getDefaultProps();
+ props.put(QueryServices.QUEUE_SIZE_ATTRIB, Integer.toString(5000));
+ props.put(IndexWriterUtils.HTABLE_THREAD_KEY, Integer.toString(100));
+ // Make a small batch size to test multiple calls to reserve sequences
+ props.put(QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB,
+ Long.toString(BATCH_SIZE));
+ // Must update config before starting server
+ setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+ }
+
+ @AfterClass
+ public static void doTeardown() throws Exception {
+ dropNonSystemTables();
+ }
+
+ @After
+ public void cleanUpAfterTest() throws Exception {
+ deletePriorTables(HConstants.LATEST_TIMESTAMP, getUrl());
+ }
+
+ public static Map<String, String> getDefaultProps() {
+ Map<String, String> props = new HashMap<String, String>();
+ // Must update config before starting server
+ props.put(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB,
+ Boolean.FALSE.toString());
+ props.put("java.security.krb5.realm", "");
+ props.put("java.security.krb5.kdc", "");
+ return props;
+ }
+
+ protected Connection getConnection(String url) throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ return conn;
+ }
+
+ /**
+ * A canary test. Will show if the infrastructure is set-up correctly.
+ */
+ @Test
+ public void testClusterOK() throws Exception {
+ Connection conn = getConnection(getUrl());
+ conn.setAutoCommit(true);
+
+ String sampleDDL = "CREATE TABLE TEST_METRICS " +
+ "(TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, " +
+ "TTL=86400, COMPRESSION='NONE' ";
+
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(sampleDDL);
+ conn.commit();
+
+ ResultSet rs = stmt.executeQuery(
+ "SELECT COUNT(TEST_COLUMN) FROM TEST_METRICS");
+
+ rs.next();
+ long l = rs.getLong(1);
+ assertThat(l).isGreaterThanOrEqualTo(0);
+
+ stmt.execute("DROP TABLE TEST_METRICS");
+ conn.close();
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
new file mode 100644
index 0000000..1430478
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractPhoenixConnectionlessTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.jdbc.PhoenixTestDriver;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import static org.apache.phoenix.util.PhoenixRuntime.TENANT_ID_ATTRIB;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public abstract class AbstractPhoenixConnectionlessTest extends BaseTest {
+
+ protected static String getUrl() {
+ return TestUtil.PHOENIX_CONNECTIONLESS_JDBC_URL;
+ }
+
+ protected static String getUrl(String tenantId) {
+ return getUrl() + ';' + TENANT_ID_ATTRIB + '=' + tenantId;
+ }
+
+ protected static PhoenixTestDriver driver;
+
+ private static void startServer(String url) throws Exception {
+ assertNull(driver);
+ // only load the test driver if we are testing locally - for integration tests, we want to
+ // test on a wider scale
+ if (PhoenixEmbeddedDriver.isTestUrl(url)) {
+ driver = initDriver(ReadOnlyProps.EMPTY_PROPS);
+ assertTrue(DriverManager.getDriver(url) == driver);
+ driver.connect(url, PropertiesUtil.deepCopy(TEST_PROPERTIES));
+ }
+ }
+
+ protected static synchronized PhoenixTestDriver initDriver(ReadOnlyProps props) throws Exception {
+ if (driver == null) {
+ driver = new PhoenixTestDriver(props);
+ DriverManager.registerDriver(driver);
+ }
+ return driver;
+ }
+
+ private String connUrl;
+
+ @Before
+ public void setup() throws Exception {
+ connUrl = getUrl();
+ startServer(connUrl);
+ }
+
+ @Test
+ public void testStorageSystemInitialized() throws Exception {
+ String sampleDDL = "CREATE TABLE TEST_METRICS (TEST_COLUMN VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (TEST_COLUMN)) DATA_BLOCK_ENCODING='FAST_DIFF', " +
+ "IMMUTABLE_ROWS=true, TTL=86400, COMPRESSION='SNAPPY'";
+
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ try {
+ conn = DriverManager.getConnection(connUrl);
+ stmt = conn.prepareStatement(sampleDDL);
+ stmt.execute();
+ conn.commit();
+ } finally {
+ if (stmt != null) {
+ stmt.close();
+ }
+ if (conn != null) {
+ conn.close();
+ }
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (driver != null) {
+ try {
+ driver.close();
+ } finally {
+ PhoenixTestDriver phoenixTestDriver = driver;
+ driver = null;
+ DriverManager.deregisterDriver(phoenixTestDriver);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
new file mode 100644
index 0000000..f7e53f5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITClusterAggregator.java
@@ -0,0 +1,384 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.fail;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_CLUSTER_AGGREGATE_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+
+public class ITClusterAggregator extends AbstractMiniHBaseClusterTest {
+ private Connection conn;
+ private PhoenixHBaseAccessor hdb;
+
+ @Before
+ public void setUp() throws Exception {
+ hdb = createTestableHBaseAccessor();
+ // inits connection, starts mini cluster
+ conn = getConnection(getUrl());
+
+ hdb.initMetricSchema();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("delete from METRIC_AGGREGATE");
+ stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+ stmt.execute("delete from METRIC_RECORD");
+ stmt.execute("delete from METRIC_RECORD_HOURLY");
+ stmt.execute("delete from METRIC_RECORD_MINUTE");
+ conn.commit();
+
+ stmt.close();
+ conn.close();
+ }
+
+ @Test
+ public void testShouldAggregateClusterProperly() throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregator agg =
+ new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 2));
+ ctime += minute;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 1));
+
+ // WHEN
+ long endTime = ctime + minute;
+ boolean success = agg.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ int recordCount = 0;
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(3.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ }
+
+
+ @Test
+ public void testShouldAggregateDifferentMetricsOnClusterProperly()
+ throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregator agg =
+ new TimelineMetricClusterAggregator(hdb, new Configuration());
+
+ // here we put some metrics tha will be aggregated
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_used", 1));
+
+ ctime += minute;
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_free", 2));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local2",
+ "disk_free", 1));
+ hdb.insertMetricRecords(prepareSingleTimelineMetric(ctime, "local1",
+ "disk_used", 1));
+
+ // WHEN
+ long endTime = ctime + minute;
+ boolean success = agg.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_CLUSTER_AGGREGATE_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA)));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ int recordCount = 0;
+ while (rs.next()) {
+ TimelineClusterMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricClusterKeyFromResultSet(rs);
+ MetricClusterAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricClusterAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2, currentHostAggregate.getNumberOfHosts());
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(3.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else if ("disk_used".equals(currentMetric.getMetricName())) {
+ assertEquals(1, currentHostAggregate.getNumberOfHosts());
+ assertEquals(1.0, currentHostAggregate.getMax());
+ assertEquals(1.0, currentHostAggregate.getMin());
+ assertEquals(1.0, currentHostAggregate.getSum());
+ recordCount++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ }
+
+
+ @Test
+ public void testShouldAggregateClusterOnHourProperly() throws Exception {
+ // GIVEN
+ TimelineMetricClusterAggregatorHourly agg =
+ new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+ // this time can be virtualized! or made independent from real clock
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> records =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ records.put(createEmptyTimelineMetric(ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric(ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+
+ hdb.saveClusterAggregateRecords(records);
+
+ // WHEN
+ agg.doWork(startTime, ctime + minute);
+
+ // THEN
+ ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+ int count = 0;
+ while (rs.next()) {
+ assertEquals("METRIC_NAME", "disk_used", rs.getString("METRIC_NAME"));
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+ count++;
+ }
+
+ assertEquals("One hourly aggregated row expected ", 1, count);
+ }
+
+ @Test
+ public void testShouldAggregateDifferentMetricsOnHourProperly() throws
+ Exception {
+ // GIVEN
+ TimelineMetricClusterAggregatorHourly agg =
+ new TimelineMetricClusterAggregatorHourly(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+
+ Map<TimelineClusterMetric, MetricClusterAggregate> records =
+ new HashMap<TimelineClusterMetric, MetricClusterAggregate>();
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ records.put(createEmptyTimelineMetric("disk_used", ctime += minute),
+ new MetricClusterAggregate(4.0, 2, 0.0, 4.0, 0.0));
+ records.put(createEmptyTimelineMetric("disk_free", ctime),
+ new MetricClusterAggregate(1.0, 2, 0.0, 1.0, 1.0));
+
+ hdb.saveClusterAggregateRecords(records);
+
+ // WHEN
+ agg.doWork(startTime, ctime + minute);
+
+ // THEN
+ ResultSet rs = executeQuery("SELECT * FROM METRIC_AGGREGATE_HOURLY");
+ int count = 0;
+ while (rs.next()) {
+ if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
+ } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
+ assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
+ assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
+ assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
+ }
+
+ count++;
+ }
+
+ assertEquals("Two hourly aggregated row expected ", 2, count);
+ }
+
+ private ResultSet executeQuery(String query) throws SQLException {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+ return stmt.executeQuery(query);
+ }
+
+ private TimelineClusterMetric createEmptyTimelineMetric(String name,
+ long startTime) {
+ TimelineClusterMetric metric = new TimelineClusterMetric(name,
+ "test_app", null, startTime, null);
+
+ return metric;
+ }
+
+ private TimelineClusterMetric createEmptyTimelineMetric(long startTime) {
+ return createEmptyTimelineMetric("disk_used", startTime);
+ }
+
+ private MetricHostAggregate
+ createMetricHostAggregate(double max, double min, int numberOfSamples,
+ double sum) {
+ MetricHostAggregate expectedAggregate =
+ new MetricHostAggregate();
+ expectedAggregate.setMax(max);
+ expectedAggregate.setMin(min);
+ expectedAggregate.setNumberOfSamples(numberOfSamples);
+ expectedAggregate.setSum(sum);
+
+ return expectedAggregate;
+ }
+
+ private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ Configuration metricsConf = new Configuration();
+ metricsConf.set(
+ TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+ return
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ metricsConf,
+ new ConnectionProvider() {
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
+ }
+
+ private TimelineMetrics prepareSingleTimelineMetric(long startTime,
+ String host,
+ String metricName,
+ double val) {
+ TimelineMetrics m = new TimelineMetrics();
+ m.setMetrics(Arrays.asList(
+ createTimelineMetric(startTime, metricName, host, val)));
+
+ return m;
+ }
+
+ private TimelineMetric createTimelineMetric(long startTime,
+ String metricName,
+ String host,
+ double val) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName(host);
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, val);
+ vals.put(startTime + 30000l, val);
+ vals.put(startTime + 45000l, val);
+ vals.put(startTime + 60000l, val);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
new file mode 100644
index 0000000..d166a22
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITMetricAggregator.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.GET_METRIC_AGGREGATE_ONLY_SQL;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.LOG;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.NATIVE_TIME_RANGE_DELTA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITMetricAggregator extends AbstractMiniHBaseClusterTest {
+ private Connection conn;
+ private PhoenixHBaseAccessor hdb;
+
+ @Before
+ public void setUp() throws Exception {
+ hdb = createTestableHBaseAccessor();
+ // inits connection, starts mini cluster
+ conn = getConnection(getUrl());
+
+ hdb.initMetricSchema();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Connection conn = getConnection(getUrl());
+ Statement stmt = conn.createStatement();
+
+ stmt.execute("delete from METRIC_AGGREGATE");
+ stmt.execute("delete from METRIC_AGGREGATE_HOURLY");
+ stmt.execute("delete from METRIC_RECORD");
+ stmt.execute("delete from METRIC_RECORD_HOURLY");
+ stmt.execute("delete from METRIC_RECORD_MINUTE");
+ conn.commit();
+
+ stmt.close();
+ conn.close();
+ }
+
+ @Test
+ public void testShouldInsertMetrics() throws Exception {
+ // GIVEN
+
+ // WHEN
+ long startTime = System.currentTimeMillis();
+ TimelineMetrics metricsSent = prepareTimelineMetrics(startTime, "local");
+ hdb.insertMetricRecords(metricsSent);
+
+ Condition queryCondition = new Condition(null, "local", null, null,
+ startTime, startTime + (15 * 60 * 1000), null, false);
+ TimelineMetrics recordRead = hdb.getMetricRecords(queryCondition);
+
+ // THEN
+ assertThat(recordRead.getMetrics()).hasSize(2)
+ .extracting("metricName")
+ .containsOnly("mem_free", "disk_free");
+
+ assertThat(metricsSent.getMetrics())
+ .usingElementComparator(TIME_IGNORING_COMPARATOR)
+ .containsExactlyElementsOf(recordRead.getMetrics());
+ }
+
+ @Test
+ public void testShouldAggregateMinuteProperly() throws Exception {
+ // GIVEN
+// TimelineMetricAggregatorMinute aggregatorMinute =
+// new TimelineMetricAggregatorMinute(hdb, new Configuration());
+ TimelineMetricAggregator aggregatorMinute = TimelineMetricAggregatorFactory
+ .createTimelineMetricAggregatorMinute(hdb, new Configuration());
+
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+ hdb.insertMetricRecords(prepareTimelineMetrics(startTime, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+ hdb.insertMetricRecords(prepareTimelineMetrics(ctime += minute, "local"));
+
+ // WHEN
+ long endTime = startTime + 1000 * 60 * 4;
+ boolean success = aggregatorMinute.doWork(startTime, endTime);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_AGGREGATE_MINUTE_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+ MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+
+ int count = 0;
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if ("disk_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ count++;
+ } else if ("mem_free".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ count++;
+ } else {
+ fail("Unexpected entry");
+ }
+ }
+ assertEquals("Two aggregated entries expected", 2, count);
+ }
+
+ @Test
+ public void testShouldAggregateHourProperly() throws Exception {
+ // GIVEN
+// TimelineMetricAggregatorHourly aggregator =
+// new TimelineMetricAggregatorHourly(hdb, new Configuration());
+
+ TimelineMetricAggregator aggregator = TimelineMetricAggregatorFactory
+ .createTimelineMetricAggregatorHourly(hdb, new Configuration());
+ long startTime = System.currentTimeMillis();
+
+ MetricHostAggregate expectedAggregate =
+ createMetricHostAggregate(2.0, 0.0, 20, 15.0);
+ Map<TimelineMetric, MetricHostAggregate>
+ aggMap = new HashMap<TimelineMetric,
+ MetricHostAggregate>();
+
+ int min_5 = 5 * 60 * 1000;
+ long ctime = startTime - min_5;
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+ aggMap.put(createEmptyTimelineMetric(ctime += min_5), expectedAggregate);
+
+ hdb.saveHostAggregateRecords(aggMap, METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+ //WHEN
+ long endTime = ctime + min_5;
+ boolean success = aggregator.doWork(startTime, endTime);
+ assertTrue(success);
+
+ //THEN
+ Condition condition = new Condition(null, null, null, null, startTime,
+ endTime, null, true);
+ condition.setStatement(String.format(GET_METRIC_AGGREGATE_ONLY_SQL,
+ PhoenixTransactSQL.getNaiveTimeRangeHint(startTime, NATIVE_TIME_RANGE_DELTA),
+ METRICS_AGGREGATE_HOURLY_TABLE_NAME));
+
+ PreparedStatement pstmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt
+ (conn, condition);
+ ResultSet rs = pstmt.executeQuery();
+
+ while (rs.next()) {
+ TimelineMetric currentMetric =
+ PhoenixHBaseAccessor.getTimelineMetricKeyFromResultSet(rs);
+ MetricHostAggregate currentHostAggregate =
+ PhoenixHBaseAccessor.getMetricHostAggregateFromResultSet(rs);
+
+ if ("disk_used".equals(currentMetric.getMetricName())) {
+ assertEquals(2.0, currentHostAggregate.getMax());
+ assertEquals(0.0, currentHostAggregate.getMin());
+ assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
+ assertEquals(12 * 15.0, currentHostAggregate.getSum());
+ assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ }
+ }
+ }
+
+ private TimelineMetric createEmptyTimelineMetric(long startTime) {
+ TimelineMetric metric = new TimelineMetric();
+ metric.setMetricName("disk_used");
+ metric.setAppId("test_app");
+ metric.setHostName("test_host");
+ metric.setTimestamp(startTime);
+
+ return metric;
+ }
+
+ private MetricHostAggregate
+ createMetricHostAggregate(double max, double min, int numberOfSamples,
+ double sum) {
+ MetricHostAggregate expectedAggregate =
+ new MetricHostAggregate();
+ expectedAggregate.setMax(max);
+ expectedAggregate.setMin(min);
+ expectedAggregate.setNumberOfSamples(numberOfSamples);
+ expectedAggregate.setSum(sum);
+
+ return expectedAggregate;
+ }
+
+ private PhoenixHBaseAccessor createTestableHBaseAccessor() {
+ Configuration metricsConf = new Configuration();
+ metricsConf.set(
+ TimelineMetricConfiguration.HBASE_COMPRESSION_SCHEME, "NONE");
+
+ return
+ new PhoenixHBaseAccessor(
+ new Configuration(),
+ metricsConf,
+ new ConnectionProvider() {
+ @Override
+ public Connection getConnection() {
+ Connection connection = null;
+ try {
+ connection = DriverManager.getConnection(getUrl());
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+ }
+ return connection;
+ }
+ });
+ }
+
+ private final static Comparator<TimelineMetric> TIME_IGNORING_COMPARATOR =
+ new Comparator<TimelineMetric>() {
+ @Override
+ public int compare(TimelineMetric o1, TimelineMetric o2) {
+ return o1.equalsExceptTime(o2) ? 0 : 1;
+ }
+ };
+
+ private TimelineMetrics prepareTimelineMetrics(long startTime, String host) {
+ TimelineMetrics metrics = new TimelineMetrics();
+ metrics.setMetrics(Arrays.asList(
+ createMetric(startTime, "disk_free", host),
+ createMetric(startTime, "mem_free", host)));
+
+ return metrics;
+ }
+
+ private TimelineMetric createMetric(long startTime,
+ String metricName,
+ String host) {
+ TimelineMetric m = new TimelineMetric();
+ m.setAppId("host");
+ m.setHostName(host);
+ m.setMetricName(metricName);
+ m.setStartTime(startTime);
+ Map<Long, Double> vals = new HashMap<Long, Double>();
+ vals.put(startTime + 15000l, 0.0);
+ vals.put(startTime + 30000l, 0.0);
+ vals.put(startTime + 45000l, 1.0);
+ vals.put(startTime + 60000l, 2.0);
+
+ m.setMetricValues(vals);
+
+ return m;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
new file mode 100644
index 0000000..0722ccd
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestClusterSuite.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import static org.junit.runners.Suite.SuiteClasses;
+
+@RunWith(Suite.class)
+@SuiteClasses({ITMetricAggregator.class, ITClusterAggregator.class})
+public class TestClusterSuite {
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
new file mode 100644
index 0000000..5d8ba96
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
+ .timeline;
+
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestMetricHostAggregate {
+
+ @Test
+ public void testCreateAggregate() throws Exception {
+ // given
+ MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+ //then
+ assertThat(aggregate.getSum()).isEqualTo(3.0);
+ assertThat(aggregate.getMin()).isEqualTo(1.0);
+ assertThat(aggregate.getMax()).isEqualTo(2.0);
+ assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+ }
+
+ @Test
+ public void testUpdateAggregates() throws Exception {
+ // given
+ MetricHostAggregate aggregate = createAggregate(3.0, 1.0, 2.0, 2);
+
+ //when
+ aggregate.updateAggregates(createAggregate(8.0, 0.5, 7.5, 2));
+ aggregate.updateAggregates(createAggregate(1.0, 1.0, 1.0, 1));
+
+ //then
+ assertThat(aggregate.getSum()).isEqualTo(12.0);
+ assertThat(aggregate.getMin()).isEqualTo(0.5);
+ assertThat(aggregate.getMax()).isEqualTo(7.5);
+ assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+ }
+
+ private MetricHostAggregate createAggregate
+ (double sum, double min, double max, int samplesCount) {
+ MetricHostAggregate aggregate = new MetricHostAggregate();
+ aggregate.setSum(sum);
+ aggregate.setMax(max);
+ aggregate.setMin(min);
+ aggregate.setDeviation(0.0);
+ aggregate.setNumberOfSamples(samplesCount);
+ return aggregate;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
new file mode 100644
index 0000000..758f5a9
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestPhoenixTransactSQL.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixTransactSQL.Condition;
+
+public class TestPhoenixTransactSQL {
+ @Test
+ public void testConditionClause() throws Exception {
+ Condition condition = new Condition(
+ Arrays.asList("cpu_user", "mem_free"), "h1", "a1", "i1",
+ 1407959718L, 1407959918L, null, false);
+
+ String preparedClause = condition.getConditionClause();
+ String expectedClause = "METRIC_NAME IN (?, ?) AND HOSTNAME = ? AND " +
+ "APP_ID = ? AND INSTANCE_ID = ? AND SERVER_TIME >= ? AND SERVER_TIME < ?";
+
+ Assert.assertNotNull(preparedClause);
+ Assert.assertEquals(expectedClause, preparedClause);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
new file mode 100644
index 0000000..c893314
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestTimelineMetricStore implements TimelineMetricStore {
+ @Override
+ public TimelineMetrics getTimelineMetrics(List<String> metricNames,
+ String hostname, String applicationId, String instanceId, Long startTime,
+ Long endTime, Integer limit, boolean groupedByHost) throws SQLException,
+ IOException {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+ timelineMetrics.setMetrics(metricList);
+ TimelineMetric metric1 = new TimelineMetric();
+ TimelineMetric metric2 = new TimelineMetric();
+ metricList.add(metric1);
+ metricList.add(metric2);
+ metric1.setMetricName("cpu_user");
+ metric1.setAppId("1");
+ metric1.setInstanceId(null);
+ metric1.setHostName("c6401");
+ metric1.setStartTime(1407949812L);
+ metric1.setMetricValues(new HashMap<Long, Double>() {{
+ put(1407949812L, 1.0d);
+ put(1407949912L, 1.8d);
+ put(1407950002L, 0.7d);
+ }});
+
+ metric2.setMetricName("mem_free");
+ metric2.setAppId("2");
+ metric2.setInstanceId("3");
+ metric2.setHostName("c6401");
+ metric2.setStartTime(1407949812L);
+ metric2.setMetricValues(new HashMap<Long, Double>() {{
+ put(1407949812L, 2.5d);
+ put(1407949912L, 3.0d);
+ put(1407950002L, 0.9d);
+ }});
+
+ return timelineMetrics;
+ }
+
+ @Override
+ public TimelineMetric getTimelineMetric(String metricName, String hostname,
+ String applicationId, String instanceId, Long startTime, Long endTime,
+ Integer limit) throws SQLException, IOException {
+
+ return null;
+ }
+
+ @Override
+ public TimelinePutResponse putMetrics(TimelineMetrics metrics)
+ throws SQLException, IOException {
+
+ return new TimelinePutResponse();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java
new file mode 100644
index 0000000..d684a27
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestGenericObjectMapper {
+
+ @Test
+ public void testEncoding() {
+ testEncoding(Long.MAX_VALUE);
+ testEncoding(Long.MIN_VALUE);
+ testEncoding(0l);
+ testEncoding(128l);
+ testEncoding(256l);
+ testEncoding(512l);
+ testEncoding(-256l);
+ }
+
+ private static void testEncoding(long l) {
+ byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
+ assertEquals("error decoding", l,
+ GenericObjectMapper.readReverseOrderedLong(b, 0));
+ byte[] buf = new byte[16];
+ System.arraycopy(b, 0, buf, 5, 8);
+ assertEquals("error decoding at offset", l,
+ GenericObjectMapper.readReverseOrderedLong(buf, 5));
+ if (l > Long.MIN_VALUE) {
+ byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
+ assertEquals("error preserving ordering", 1,
+ WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
+ }
+ if (l < Long.MAX_VALUE) {
+ byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
+ assertEquals("error preserving ordering", 1,
+ WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
+ }
+ }
+
+ private static void verify(Object o) throws IOException {
+ assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
+ }
+
+ @Test
+ public void testValueTypes() throws IOException {
+ verify(Integer.MAX_VALUE);
+ verify(Integer.MIN_VALUE);
+ assertEquals(Integer.MAX_VALUE, GenericObjectMapper.read(
+ GenericObjectMapper.write((long) Integer.MAX_VALUE)));
+ assertEquals(Integer.MIN_VALUE, GenericObjectMapper.read(
+ GenericObjectMapper.write((long) Integer.MIN_VALUE)));
+ verify((long)Integer.MAX_VALUE + 1l);
+ verify((long)Integer.MIN_VALUE - 1l);
+
+ verify(Long.MAX_VALUE);
+ verify(Long.MIN_VALUE);
+
+ assertEquals(42, GenericObjectMapper.read(GenericObjectMapper.write(42l)));
+ verify(42);
+ verify(1.23);
+ verify("abc");
+ verify(true);
+ List<String> list = new ArrayList<String>();
+ list.add("123");
+ list.add("abc");
+ verify(list);
+ Map<String,String> map = new HashMap<String,String>();
+ map.put("k1","v1");
+ map.put("k2","v2");
+ verify(map);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
new file mode 100644
index 0000000..9b27309
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.iq80.leveldb.DBIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.junit.Assert.assertEquals;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
+ private FileContext fsContext;
+ private File fsPath;
+
+ @Before
+ public void setup() throws Exception {
+ fsContext = FileContext.getLocalFSFileContext();
+ Configuration conf = new Configuration();
+ fsPath = new File("target", this.getClass().getSimpleName() +
+ "-tmpDir").getAbsoluteFile();
+ fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+ conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+ fsPath.getAbsolutePath());
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+ store = new LeveldbTimelineStore();
+ store.init(conf);
+ store.start();
+ loadTestData();
+ loadVerificationData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.stop();
+ fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+ }
+
+ @Test
+ public void testGetSingleEntity() throws IOException {
+ super.testGetSingleEntity();
+ ((LeveldbTimelineStore)store).clearStartTimeCache();
+ super.testGetSingleEntity();
+ loadTestData();
+ }
+
+ @Test
+ public void testGetEntities() throws IOException {
+ super.testGetEntities();
+ }
+
+ @Test
+ public void testGetEntitiesWithFromId() throws IOException {
+ super.testGetEntitiesWithFromId();
+ }
+
+ @Test
+ public void testGetEntitiesWithFromTs() throws IOException {
+ super.testGetEntitiesWithFromTs();
+ }
+
+ @Test
+ public void testGetEntitiesWithPrimaryFilters() throws IOException {
+ super.testGetEntitiesWithPrimaryFilters();
+ }
+
+ @Test
+ public void testGetEntitiesWithSecondaryFilters() throws IOException {
+ super.testGetEntitiesWithSecondaryFilters();
+ }
+
+ @Test
+ public void testGetEvents() throws IOException {
+ super.testGetEvents();
+ }
+
+ @Test
+ public void testCacheSizes() {
+ Configuration conf = new Configuration();
+ assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
+ assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
+ conf.setInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+ 10001);
+ assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
+ conf = new Configuration();
+ conf.setInt(
+ YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+ 10002);
+ assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
+ }
+
+ private boolean deleteNextEntity(String entityType, byte[] ts)
+ throws IOException, InterruptedException {
+ DBIterator iterator = null;
+ DBIterator pfIterator = null;
+ try {
+ iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
+ pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
+ return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
+ iterator, pfIterator, false);
+ } finally {
+ IOUtils.cleanup(null, iterator, pfIterator);
+ }
+ }
+
+ @Test
+ public void testGetEntityTypes() throws IOException {
+ List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
+ assertEquals(4, entityTypes.size());
+ assertEquals(entityType1, entityTypes.get(0));
+ assertEquals(entityType2, entityTypes.get(1));
+ assertEquals(entityType4, entityTypes.get(2));
+ assertEquals(entityType5, entityTypes.get(3));
+ }
+
+ @Test
+ public void testDeleteEntities() throws IOException, InterruptedException {
+ assertEquals(2, getEntities("type_1").size());
+ assertEquals(1, getEntities("type_2").size());
+
+ assertEquals(false, deleteNextEntity(entityType1,
+ writeReverseOrderedLong(122l)));
+ assertEquals(2, getEntities("type_1").size());
+ assertEquals(1, getEntities("type_2").size());
+
+ assertEquals(true, deleteNextEntity(entityType1,
+ writeReverseOrderedLong(123l)));
+ List<TimelineEntity> entities = getEntities("type_2");
+ assertEquals(1, entities.size());
+ verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
+ entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
+ EMPTY_MAP, entities.get(0));
+ entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
+ assertEquals(1, entities.size());
+ verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+ primaryFilters, otherInfo, entities.get(0));
+
+ ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+ assertEquals(1, getEntities("type_1").size());
+ assertEquals(0, getEntities("type_2").size());
+ assertEquals(3, ((LeveldbTimelineStore)store).getEntityTypes().size());
+
+ ((LeveldbTimelineStore)store).discardOldEntities(123l);
+ assertEquals(0, getEntities("type_1").size());
+ assertEquals(0, getEntities("type_2").size());
+ assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
+ assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
+ }
+
+ @Test
+ public void testDeleteEntitiesPrimaryFilters()
+ throws IOException, InterruptedException {
+ Map<String, Set<Object>> primaryFilter =
+ Collections.singletonMap("user", Collections.singleton(
+ (Object) "otheruser"));
+ TimelineEntities atsEntities = new TimelineEntities();
+ atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b,
+ entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter,
+ null)));
+ TimelinePutResponse response = store.put(atsEntities);
+ assertEquals(0, response.getErrors().size());
+
+ NameValuePair pfPair = new NameValuePair("user", "otheruser");
+ List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1",
+ pfPair);
+ assertEquals(1, entities.size());
+ verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
+ EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0));
+
+ entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
+ assertEquals(2, entities.size());
+ verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
+ primaryFilters, otherInfo, entities.get(0));
+ verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
+ primaryFilters, otherInfo, entities.get(1));
+
+ ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+ assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
+ assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
+
+ ((LeveldbTimelineStore)store).discardOldEntities(123l);
+ assertEquals(0, getEntities("type_1").size());
+ assertEquals(0, getEntities("type_2").size());
+ assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
+
+ assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
+ assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
+ }
+
+ @Test
+ public void testFromTsWithDeletion()
+ throws IOException, InterruptedException {
+ long l = System.currentTimeMillis();
+ assertEquals(2, getEntitiesFromTs("type_1", l).size());
+ assertEquals(1, getEntitiesFromTs("type_2", l).size());
+ assertEquals(2, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+ l).size());
+ ((LeveldbTimelineStore)store).discardOldEntities(123l);
+ assertEquals(0, getEntitiesFromTs("type_1", l).size());
+ assertEquals(0, getEntitiesFromTs("type_2", l).size());
+ assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+ l).size());
+ assertEquals(0, getEntities("type_1").size());
+ assertEquals(0, getEntities("type_2").size());
+ assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+ l).size());
+ loadTestData();
+ assertEquals(0, getEntitiesFromTs("type_1", l).size());
+ assertEquals(0, getEntitiesFromTs("type_2", l).size());
+ assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
+ l).size());
+ assertEquals(2, getEntities("type_1").size());
+ assertEquals(1, getEntities("type_2").size());
+ assertEquals(2, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a52f8a55/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java
new file mode 100644
index 0000000..415de53
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestMemoryTimelineStore.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TestMemoryTimelineStore extends TimelineStoreTestUtils {
+
+ @Before
+ public void setup() throws Exception {
+ store = new MemoryTimelineStore();
+ store.init(new YarnConfiguration());
+ store.start();
+ loadTestData();
+ loadVerificationData();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.stop();
+ }
+
+ public TimelineStore getTimelineStore() {
+ return store;
+ }
+
+ @Test
+ public void testGetSingleEntity() throws IOException {
+ super.testGetSingleEntity();
+ }
+
+ @Test
+ public void testGetEntities() throws IOException {
+ super.testGetEntities();
+ }
+
+ @Test
+ public void testGetEntitiesWithFromId() throws IOException {
+ super.testGetEntitiesWithFromId();
+ }
+
+ @Test
+ public void testGetEntitiesWithFromTs() throws IOException {
+ super.testGetEntitiesWithFromTs();
+ }
+
+ @Test
+ public void testGetEntitiesWithPrimaryFilters() throws IOException {
+ super.testGetEntitiesWithPrimaryFilters();
+ }
+
+ @Test
+ public void testGetEntitiesWithSecondaryFilters() throws IOException {
+ super.testGetEntitiesWithSecondaryFilters();
+ }
+
+ @Test
+ public void testGetEvents() throws IOException {
+ super.testGetEvents();
+ }
+
+}