You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/10/06 17:53:49 UTC
ambari git commit: AMBARI-22163 : Anomaly Storage: Design Metric
anomalies schema. (avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams 4613b471e -> a9c6054fe
AMBARI-22163 : Anomaly Storage: Design Metric anomalies schema. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/a9c6054f
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/a9c6054f
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/a9c6054f
Branch: refs/heads/branch-3.0-ams
Commit: a9c6054fe3c2512f8021f3bb4fb9150e40238c5b
Parents: 4613b47
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Fri Oct 6 10:53:28 2017 -0700
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Fri Oct 6 10:53:28 2017 -0700
----------------------------------------------------------------------
ambari-logsearch/ambari-logsearch-it/pom.xml | 2 +-
.../pom.xml | 33 +++++-
.../common/ADServiceConfiguration.scala | 74 +++++++++++++
.../common/PhoenixQueryConstants.scala | 109 +++++++++++++++++++
.../db/PhoenixAnomalyStoreAccessor.scala | 67 ++++++++++++
.../spark/prototype/SparkPhoenixReader.scala | 92 ++++++++--------
.../common/ADManagerConfigurationTest.scala | 23 ++++
.../db/PhoenixAnomalyStoreAccessorTest.scala | 26 +++++
ambari-metrics/ambari-metrics-common/pom.xml | 46 ++++++++
.../sink/timeline/query/ConnectionProvider.java | 32 ++++++
.../query/DefaultPhoenixDataSource.java | 108 ++++++++++++++++++
.../query/PhoenixConnectionProvider.java | 31 ++++++
.../metrics/timeline/PhoenixHBaseAccessor.java | 23 +---
.../timeline/query/ConnectionProvider.java | 29 -----
.../query/DefaultPhoenixDataSource.java | 90 ---------------
.../query/PhoenixConnectionProvider.java | 31 ------
.../TestApplicationHistoryServer.java | 2 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 6 +-
.../timeline/PhoenixHBaseAccessorTest.java | 4 +-
19 files changed, 601 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-logsearch/ambari-logsearch-it/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-it/pom.xml b/ambari-logsearch/ambari-logsearch-it/pom.xml
index db3e09f..b3a1d45 100644
--- a/ambari-logsearch/ambari-logsearch-it/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-it/pom.xml
@@ -122,7 +122,7 @@
</dependencies>
<build>
- <testOutputDirectory>target/classes</testOutputDirectory>
+ <testOutputDirectory>test/target/classes</testOutputDirectory>
<testResources>
<testResource>
<directory>src/test/java/</directory>
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 1a10f86..6f8f8c1 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -29,8 +29,9 @@
<artifactId>ambari-metrics-anomaly-detection-service</artifactId>
<version>2.0.0.0-SNAPSHOT</version>
<properties>
- <scala.version>2.10.4</scala.version>
+ <scala.version>2.11.1</scala.version>
<scala.binary.version>2.11</scala.binary.version>
+ <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
</properties>
<repositories>
@@ -201,5 +202,35 @@
<version>2.1.1</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.11</artifactId>
+ <version>3.0.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala
new file mode 100644
index 0000000..248c74e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/ADServiceConfiguration.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.common
+
+import java.net.{MalformedURLException, URISyntaxException}
+
+import org.apache.hadoop.conf.Configuration
+
+object ADServiceConfiguration {
+
+ private val AMS_AD_SITE_CONFIGURATION_FILE = "ams-ad-site.xml"
+ private val HBASE_SITE_CONFIGURATION_FILE = "hbase-site.xml"
+
+ val ANOMALY_METRICS_TTL = "timeline.metrics.anomaly.data.ttl"
+
+ private var hbaseConf: org.apache.hadoop.conf.Configuration = _
+ private var adConf: org.apache.hadoop.conf.Configuration = _
+
+ def initConfigs(): Unit = {
+
+ var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader
+ if (classLoader == null) classLoader = getClass.getClassLoader
+
+ try {
+ val hbaseResUrl = classLoader.getResource(HBASE_SITE_CONFIGURATION_FILE)
+ if (hbaseResUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No hbase-site present in the classpath.")
+
+ hbaseConf = new Configuration(true)
+ hbaseConf.addResource(hbaseResUrl.toURI.toURL)
+
+ val adSystemConfigUrl = classLoader.getResource(AMS_AD_SITE_CONFIGURATION_FILE)
+ if (adSystemConfigUrl == null) throw new IllegalStateException("Unable to initialize the AD subsystem. No ams-ad-site present in the classpath")
+
+ adConf = new Configuration(true)
+ adConf.addResource(adSystemConfigUrl.toURI.toURL)
+
+ } catch {
+ case me : MalformedURLException => println("MalformedURLException")
+ case ue : URISyntaxException => println("URISyntaxException")
+ }
+ }
+
+ def getHBaseConf: org.apache.hadoop.conf.Configuration = {
+ hbaseConf
+ }
+
+ def getAdConf: org.apache.hadoop.conf.Configuration = {
+ adConf
+ }
+
+ def getAnomalyDataTtl: Int = {
+ if (adConf != null) return adConf.get(ANOMALY_METRICS_TTL, "604800").toInt
+ 604800
+ }
+
+ /**
+ * ttl
+ *
+ */
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala
new file mode 100644
index 0000000..5e90d2b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/common/PhoenixQueryConstants.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.common
+
+object PhoenixQueryConstants {
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* Table Name constants */
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val METRIC_PROFILE_TABLE_NAME = "METRIC_PROFILE"
+ val METHOD_PARAMETERS_TABLE_NAME = "METHOD_PARAMETERS"
+ val PIT_ANOMALY_METRICS_TABLE_NAME = "PIT_METRIC_ANOMALIES"
+ val TREND_ANOMALY_METRICS_TABLE_NAME = "TREND_METRIC_ANOMALIES"
+ val MODEL_SNAPSHOT = "MODEL_SNAPSHOT"
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* CREATE statement constants */
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val CREATE_METRIC_PROFILE_TABLE = ""
+
+ val CREATE_METHOD_PARAMETERS_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
+ "METHOD_NAME VARCHAR, " +
+ "METHOD_TYPE VARCHAR, " +
+ "PARAMETERS VARCHAR " +
+ "CONSTRAINT pk PRIMARY KEY (METHOD_NAME)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'"
+
+ val CREATE_PIT_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" +
+ "METRIC_UUID BINARY(20) NOT NULL, " +
+ "METHOD_NAME VARCHAR, " +
+ "ANOMALY_TIMESTAMP UNSIGNED_LONG NOT NULL, " +
+ "METRIC_VALUE DOUBLE, " +
+ "SEASONAL_INFO VARCHAR, " +
+ "ANOMALY_SCORE DOUBLE, " +
+ "MODEL_SNAPSHOT VARCHAR, " +
+ "DETECTION_TIME UNSIGNED_LONG " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_TIMESTAMP)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
+
+ val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" +
+ "METRIC_UUID BINARY(20) NOT NULL, " +
+ "ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " +
+ "ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " +
+ "TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " +
+ "TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " +
+ "METHOD_NAME VARCHAR, " +
+ "ANOMALY_SCORE DOUBLE, " +
+ "MODEL_SNAPSHOT VARCHAR, " +
+ "DETECTION_TIME UNSIGNED_LONG " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
+
+ val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
+ "METRIC_UUID BINARY(20), " +
+ "METHOD_NAME VARCHAR, " +
+ "METHOD_TYPE VARCHAR, " +
+ "PARAMETERS VARCHAR " +
+ "SNAPSHOT_TIME UNSIGNED LONG NOT NULL "
+ "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " +
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'"
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* UPSERT statement constants */
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val UPSERT_METHOD_PARAMETERS_SQL: String = "UPSERT INTO %s (METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?,?,?)"
+
+ val UPSERT_PIT_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, " +
+ "SEASONAL_INFO, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
+
+ val UPSERT_TREND_ANOMALY_METRICS_SQL: String = "UPSERT INTO %s (METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, " +
+ "TEST_PERIOD_START, TEST_PERIOD_END, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)"
+
+ val UPSERT_MODEL_SNAPSHOT_SQL: String = "UPSERT INTO %s (METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS) VALUES (?, ?, ?, ?)"
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ /* GET statement constants */
+ //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ val GET_METHOD_PAREMETERS_SQL: String = "SELECT METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE METHOD_NAME = %s"
+
+ val GET_PIT_ANOMALY_METRIC_SQL: String = "SELECT METRIC_UUID, ANOMALY_TIMESTAMP, METRIC_VALUE, METHOD_NAME, SEASONAL_INFO, " +
+ "ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METRIC_METRIC_UUID = ? AND ANOMALY_TIMESTAMP > ? AND ANOMALY_TIMESTAMP <= ? " +
+ "ORDER BY ANOMALY_SCORE DESC"
+
+ val GET_TREND_ANOMALY_METRIC_SQL: String = "SELECT METRIC_METRIC_UUID, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, " +
+ "ANOMALY_PERIOD_START, METHOD_NAME, ANOMALY_SCORE, MODEL_SNAPSHOT, DETECTION_TIME FROM %s WHERE METHOD = ? AND ANOMALY_PERIOD_END > ? " +
+ "AND TEST_END_TIME <= ? ORDER BY ANOMALY_SCORE DESC"
+
+ val GET_MODEL_SNAPSHOT_SQL: String = "SELECT METRIC_UUID, METHOD_NAME, METHOD_TYPE, PARAMETERS FROM %s WHERE UUID = %s AND METHOD_NAME = %s"
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
new file mode 100644
index 0000000..6f33e56
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import java.sql.{Connection, SQLException}
+
+import org.apache.ambari.metrics.adservice.common.{ADServiceConfiguration, PhoenixQueryConstants}
+import org.apache.hadoop.hbase.util.RetryCounterFactory
+import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider}
+import java.util.concurrent.TimeUnit.SECONDS
+
+object PhoenixAnomalyStoreAccessor {
+
+ private var datasource: PhoenixConnectionProvider = _
+
+ def initAnomalyMetricSchema(): Unit = {
+
+ val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(ADServiceConfiguration.getHBaseConf)
+ val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt)
+
+ val ttl = ADServiceConfiguration.getAnomalyDataTtl
+ try {
+ var conn = datasource.getConnectionRetryingOnException(retryCounterFactory)
+ var stmt = conn.createStatement
+
+ val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE,
+ PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME)
+ stmt.executeUpdate(methodParametersSql)
+
+ val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL,
+ PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME,
+ ttl.asInstanceOf[Object])
+ stmt.executeUpdate(pointInTimeAnomalySql)
+
+ val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
+ PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME,
+ ttl.asInstanceOf[Object])
+ stmt.executeUpdate(trendAnomalySql)
+
+ val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE,
+ PhoenixQueryConstants.MODEL_SNAPSHOT)
+ stmt.executeUpdate(snapshotSql)
+
+ conn.commit()
+ } catch {
+ case e: SQLException => throw e
+ }
+ }
+
+ @throws[SQLException]
+ def getConnection: Connection = datasource.getConnection
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
index 6e1ae07..ac00764 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/spark/prototype/SparkPhoenixReader.scala
@@ -26,52 +26,52 @@ object SparkPhoenixReader {
def main(args: Array[String]) {
- if (args.length < 6) {
- System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
- System.exit(1)
- }
-
- var metricName = args(0)
- var appId = args(1)
- var hostname = args(2)
- var weight = args(3).toDouble
- var timessdev = args(4).toInt
- var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
- var modelDir = args(6)
-
- val conf = new SparkConf()
- conf.set("spark.app.name", "AMSAnomalyModelBuilder")
- //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
-
- var sc = new SparkContext(conf)
- val sqlContext = new SQLContext(sc)
-
- val currentTime = System.currentTimeMillis()
- val oneDayBack = currentTime - 24*60*60*1000
-
- val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
- df.registerTempTable("METRIC_RECORD")
- val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
- "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
-
- var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
- result.collect().foreach(
- t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
- )
-
- //val seriesName = result.head().getString(0)
- //val hostname = result.head().getString(1)
- //val appId = result.head().getString(2)
-
- val timelineMetric = new TimelineMetric()
- timelineMetric.setMetricName(metricName)
- timelineMetric.setAppId(appId)
- timelineMetric.setHostName(hostname)
- timelineMetric.setMetricValues(metricValues)
-
- var emaModel = new EmaTechnique(weight, timessdev)
- emaModel.test(timelineMetric)
- emaModel.save(sc, modelDir)
+// if (args.length < 6) {
+// System.err.println("Usage: SparkPhoenixReader <metric_name> <appId> <hostname> <weight> <timessdev> <phoenixConnectionString> <model_dir>")
+// System.exit(1)
+// }
+//
+// var metricName = args(0)
+// var appId = args(1)
+// var hostname = args(2)
+// var weight = args(3).toDouble
+// var timessdev = args(4).toInt
+// var phoenixConnectionString = args(5) //avijayan-ams-3.openstacklocal:61181:/ams-hbase-unsecure
+// var modelDir = args(6)
+//
+// val conf = new SparkConf()
+// conf.set("spark.app.name", "AMSAnomalyModelBuilder")
+// //conf.set("spark.master", "spark://avijayan-ams-2.openstacklocal:7077")
+//
+// var sc = new SparkContext(conf)
+// val sqlContext = new SQLContext(sc)
+//
+// val currentTime = System.currentTimeMillis()
+// val oneDayBack = currentTime - 24*60*60*1000
+//
+// val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> "METRIC_RECORD", "zkUrl" -> phoenixConnectionString))
+// df.registerTempTable("METRIC_RECORD")
+// val result = sqlContext.sql("SELECT METRIC_NAME, HOSTNAME, APP_ID, SERVER_TIME, METRIC_SUM, METRIC_COUNT FROM METRIC_RECORD " +
+// "WHERE METRIC_NAME = '" + metricName + "' AND HOSTNAME = '" + hostname + "' AND APP_ID = '" + appId + "' AND SERVER_TIME < " + currentTime + " AND SERVER_TIME > " + oneDayBack)
+//
+// var metricValues = new java.util.TreeMap[java.lang.Long, java.lang.Double]
+// result.collect().foreach(
+// t => metricValues.put(t.getLong(3), t.getDouble(4) / t.getInt(5))
+// )
+//
+// //val seriesName = result.head().getString(0)
+// //val hostname = result.head().getString(1)
+// //val appId = result.head().getString(2)
+//
+// val timelineMetric = new TimelineMetric()
+// timelineMetric.setMetricName(metricName)
+// timelineMetric.setAppId(appId)
+// timelineMetric.setHostName(hostname)
+// timelineMetric.setMetricValues(metricValues)
+//
+// var emaModel = new EmaTechnique(weight, timessdev)
+// emaModel.test(timelineMetric)
+// emaModel.save(sc, modelDir)
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala
new file mode 100644
index 0000000..535dc9e
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/common/ADManagerConfigurationTest.scala
@@ -0,0 +1,23 @@
+package org.apache.ambari.metrics.adservice.common
+
+import org.scalatest.FlatSpec
+
+import scala.collection.mutable
+
+class ADServiceConfigurationTest extends FlatSpec {
+
+ "A Stack" should "pop values in last-in-first-out order" in {
+ val stack = new mutable.Stack[Int]
+ stack.push(1)
+ stack.push(2)
+ assert(stack.pop() === 2)
+ assert(stack.pop() === 1)
+ }
+
+ it should "throw NoSuchElementException if an empty stack is popped" in {
+ val emptyStack = new mutable.Stack[String]
+ assertThrows[NoSuchElementException] {
+ emptyStack.pop()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala
new file mode 100644
index 0000000..142e98a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessorTest.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.db
+
+import org.scalatest.FunSuite
+
+class PhoenixAnomalyStoreAccessorTest extends FunSuite {
+
+ test("testInitAnomalyMetricSchema") {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index 1a7fef3..e868557 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -26,6 +26,13 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>ambari-metrics-common</artifactId>
<name>Ambari Metrics Common</name>
+
+ <properties>
+ <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
+ <hbase.version>1.1.2.2.6.0.3-8</hbase.version>
+ <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version>
+ </properties>
+
<build>
<plugins>
<plugin>
@@ -126,6 +133,45 @@
<dependencies>
<dependency>
+ <groupId>org.apache.phoenix</groupId>
+ <artifactId>phoenix-core</artifactId>
+ <version>${phoenix.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-el</groupId>
+ <artifactId>commons-el</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>tomcat</groupId>
+ <artifactId>jasper-compiler</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>jsp-2.1-jetty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>2.10.0</version>
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java
new file mode 100644
index 0000000..72e5fb5
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.query;
+
+
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+ public Connection getConnection() throws SQLException;
+ public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..a28a433
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline.query;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
+
+ static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+ private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+ private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+ private final String url;
+
+ private Configuration hbaseConf;
+
+ public DefaultPhoenixDataSource(Configuration hbaseConf) {
+ this.hbaseConf = hbaseConf;
+ String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
+ String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+ String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure");
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+ "access HBase store using Phoenix.");
+ }
+
+ url = String.format(connectionUrl,
+ zookeeperQuorum,
+ zookeeperClientPort,
+ znodeParent);
+ }
+
+ /**
+ * Get HBaseAdmin for table ops.
+ * @return @HBaseAdmin
+ * @throws IOException
+ */
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin();
+ }
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() throws SQLException {
+
+ LOG.debug("Metric store connection url: " + url);
+ try {
+ return DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+ throw e;
+ }
+ }
+
+ public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory)
+ throws SQLException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try{
+ return getConnection();
+ } catch (SQLException e) {
+ if(!retryCounter.shouldRetry()){
+ LOG.error("HBaseAccessor getConnection failed after "
+ + retryCounter.getMaxAttempts() + " attempts");
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java
new file mode 100644
index 0000000..194c769
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java
@@ -0,0 +1,31 @@
+package org.apache.hadoop.metrics2.sink.timeline.query;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PhoenixConnectionProvider extends ConnectionProvider {
+ /**
+ * Get HBaseAdmin for the Phoenix connection
+ * @return
+ * @throws IOException
+ */
+ HBaseAdmin getHBaseAdmin() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 685e638..e218691 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -140,8 +140,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
@@ -458,23 +458,6 @@ public class PhoenixHBaseAccessor {
return mapper.readValue(json, metricValuesTypeRef);
}
- private Connection getConnectionRetryingOnException()
- throws SQLException, InterruptedException {
- RetryCounter retryCounter = retryCounterFactory.create();
- while (true) {
- try{
- return getConnection();
- } catch (SQLException e) {
- if(!retryCounter.shouldRetry()){
- LOG.error("HBaseAccessor getConnection failed after "
- + retryCounter.getMaxAttempts() + " attempts");
- throw e;
- }
- }
- retryCounter.sleepUntilNextRetry();
- }
- }
-
/**
* Get JDBC connection to HBase store. Assumption is that the hbase
* configuration is present on the classpath and loaded by the caller into
@@ -507,7 +490,7 @@ public class PhoenixHBaseAccessor {
try {
LOG.info("Initializing metrics schema...");
- conn = getConnectionRetryingOnException();
+ conn = dataSource.getConnectionRetryingOnException(retryCounterFactory);
stmt = conn.createStatement();
// Metadata
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
deleted file mode 100644
index 24239a0..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- *
- */
-public interface ConnectionProvider {
- public Connection getConnection() throws SQLException;
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
deleted file mode 100644
index c5761f7..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
-
- static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
- private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
- private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
- private static final String ZNODE_PARENT = "zookeeper.znode.parent";
-
- private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
- private final String url;
-
- private Configuration hbaseConf;
-
- public DefaultPhoenixDataSource(Configuration hbaseConf) {
- this.hbaseConf = hbaseConf;
- String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
- String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
- String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure");
- if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
- throw new IllegalStateException("Unable to find Zookeeper quorum to " +
- "access HBase store using Phoenix.");
- }
-
- url = String.format(connectionUrl,
- zookeeperQuorum,
- zookeeperClientPort,
- znodeParent);
- }
-
- /**
- * Get HBaseAdmin for table ops.
- * @return @HBaseAdmin
- * @throws IOException
- */
- public HBaseAdmin getHBaseAdmin() throws IOException {
- return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin();
- }
-
- /**
- * Get JDBC connection to HBase store. Assumption is that the hbase
- * configuration is present on the classpath and loaded by the caller into
- * the Configuration object.
- * Phoenix already caches the HConnection between the client and HBase
- * cluster.
- *
- * @return @java.sql.Connection
- */
- public Connection getConnection() throws SQLException {
-
- LOG.debug("Metric store connection url: " + url);
- try {
- return DriverManager.getConnection(url);
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-
- throw e;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
deleted file mode 100644
index cacbcfb..0000000
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
-
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface PhoenixConnectionProvider extends ConnectionProvider {
- /**
- * Get HBaseAdmin for the Phoenix connection
- * @return
- * @throws IOException
- */
- HBaseAdmin getHBaseAdmin() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 03205e7..7b70a80 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource;
import org.apache.zookeeper.ClientCnxn;
import org.easymock.EasyMock;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 3a42db9..40691d6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -22,13 +22,9 @@ import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.ti
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL.UPSERT_METRICS_SQL;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@@ -48,7 +44,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/a9c6054f/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 7be3c0d..97d2512 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -32,19 +32,17 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.phoenix.exception.PhoenixIOException;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;