You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by av...@apache.org on 2017/11/20 19:09:46 UTC
[1/2] ambari git commit: AMBARI-22470 : Refine Metric Definition
Service and AD Query service. (avijayan)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams 0fcca47f6 -> ba9be8028
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
index 98ce0c4..db12307 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/AnomalyResource.scala
@@ -19,17 +19,62 @@ package org.apache.ambari.metrics.adservice.resource
import javax.ws.rs.core.MediaType.APPLICATION_JSON
import javax.ws.rs.core.Response
-import javax.ws.rs.{GET, Path, Produces}
+import javax.ws.rs.{GET, Path, Produces, QueryParam}
-import org.joda.time.DateTime
+import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.service.ADQueryService
+import org.apache.commons.lang.StringUtils
+
+import com.google.inject.Inject
@Path("/anomaly")
class AnomalyResource {
+ @Inject
+ var aDQueryService: ADQueryService = _
+
@GET
@Produces(Array(APPLICATION_JSON))
- def default: Response = {
- Response.ok.entity(Map("message" -> "Anomaly Detection Service!",
- "today" -> DateTime.now.toString("MM-dd-yyyy hh:mm"))).build()
+ def getTopNAnomalies(@QueryParam("type") anType: String,
+ @QueryParam("startTime") startTime: Long,
+ @QueryParam("endTime") endTime: Long,
+ @QueryParam("top") limit: Int): Response = {
+
+ val anomalies: List[MetricAnomalyInstance] = aDQueryService.getTopNAnomaliesByType(
+ parseAnomalyType(anType),
+ parseStartTime(startTime),
+ parseEndTime(endTime),
+ parseTop(limit))
+
+ Response.ok.entity(anomalies).build()
+ }
+
+ private def parseAnomalyType(anomalyType: String) : AnomalyType = {
+ if (StringUtils.isEmpty(anomalyType)) {
+ return AnomalyType.POINT_IN_TIME
+ }
+ AnomalyType.withName(anomalyType.toUpperCase)
+ }
+
+ private def parseStartTime(startTime: Long) : Long = {
+ if (startTime > 0l) {
+ return startTime
+ }
+ System.currentTimeMillis() - 60*60*1000
+ }
+
+ private def parseEndTime(endTime: Long) : Long = {
+ if (endTime > 0l) {
+ return endTime
+ }
+ System.currentTimeMillis()
+ }
+
+ private def parseTop(limit: Int) : Int = {
+ if (limit > 0) {
+ return limit
+ }
+ 5
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
index 16125fa..442bf46 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/MetricDefinitionResource.scala
@@ -17,10 +17,11 @@
package org.apache.ambari.metrics.adservice.resource
-import javax.ws.rs.{GET, Path, Produces}
+import javax.ws.rs._
import javax.ws.rs.core.MediaType.APPLICATION_JSON
+import javax.ws.rs.core.Response
-import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricSourceDefinition}
+import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey, MetricSourceDefinition}
import org.apache.commons.lang.StringUtils
import com.google.inject.Inject
@@ -33,8 +34,76 @@ class MetricDefinitionResource {
@GET
@Produces(Array(APPLICATION_JSON))
- def getMetricDefinition (definitionName: String) : MetricSourceDefinition = {
- null
+ @Path("/{name}")
+ def defaultGet(@PathParam("name") definitionName: String): Response = {
+
+ if (StringUtils.isEmpty(definitionName)) {
+ Response.ok.entity(Map("message" -> "Definition name cannot be empty. Use query parameter 'name'")).build()
+ }
+ val metricSourceDefinition = metricDefinitionService.getDefinitionByName(definitionName)
+ if (metricSourceDefinition != null) {
+ Response.ok.entity(metricSourceDefinition).build()
+ } else {
+ Response.ok.entity(Map("message" -> "Definition not found")).build()
+ }
+ }
+
+ @GET
+ @Produces(Array(APPLICATION_JSON))
+ def getAllMetricDefinitions: Response = {
+ val metricSourceDefinitionMap: List[MetricSourceDefinition] = metricDefinitionService.getDefinitions
+ Response.ok.entity(metricSourceDefinitionMap).build()
+ }
+
+ @GET
+ @Path("/keys")
+ @Produces(Array(APPLICATION_JSON))
+ def getMetricKeys: Response = {
+ val metricKeyMap: Map[String, Set[MetricKey]] = metricDefinitionService.getMetricKeys
+ Response.ok.entity(metricKeyMap).build()
}
+ @POST
+ @Produces(Array(APPLICATION_JSON))
+ def defaultPost(definition: MetricSourceDefinition) : Response = {
+ if (definition == null) {
+ Response.ok.entity(Map("message" -> "Definition content cannot be empty.")).build()
+ }
+ val success : Boolean = metricDefinitionService.addDefinition(definition)
+ if (success) {
+ Response.ok.entity(Map("message" -> "Definition saved")).build()
+ } else {
+ Response.ok.entity(Map("message" -> "Definition could not be saved")).build()
+ }
+ }
+
+ @PUT
+ @Produces(Array(APPLICATION_JSON))
+ def defaultPut(definition: MetricSourceDefinition) : Response = {
+ if (definition == null) {
+ Response.ok.entity(Map("message" -> "Definition content cannot be empty.")).build()
+ }
+ val success : Boolean = metricDefinitionService.updateDefinition(definition)
+ if (success) {
+ Response.ok.entity(Map("message" -> "Definition updated")).build()
+ } else {
+ Response.ok.entity(Map("message" -> "Definition could not be updated")).build()
+ }
+ }
+
+ @DELETE
+ @Produces(Array(APPLICATION_JSON))
+ @Path("/{name}")
+ def defaultDelete(@PathParam("name") definitionName: String): Response = {
+
+ if (StringUtils.isEmpty(definitionName)) {
+ Response.ok.entity(Map("message" -> "Definition name cannot be empty. Use query parameter 'name'")).build()
+ }
+ val success: Boolean = metricDefinitionService.deleteDefinitionByName(definitionName)
+ if (success) {
+ Response.ok.entity(Map("message" -> "Definition deleted")).build()
+ } else {
+ Response.ok.entity(Map("message" -> "Definition could not be deleted")).build()
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
index 22fe0ac..fd55b64 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/resource/RootResource.scala
@@ -17,6 +17,8 @@
*/
package org.apache.ambari.metrics.adservice.resource
+import java.time.LocalDateTime
+
import javax.ws.rs.core.MediaType.APPLICATION_JSON
import javax.ws.rs.core.Response
import javax.ws.rs.{GET, Path, Produces}
@@ -29,7 +31,8 @@ class RootResource {
@Produces(Array(APPLICATION_JSON))
@GET
def default: Response = {
- Response.ok.entity(Map("name" -> "anomaly-detection-service", "today" -> DateTime.now.toString("MM-dd-yyyy hh:mm"))).build()
+ val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm")
+ Response.ok.entity(Map("name" -> "anomaly-detection-service", "today" -> LocalDateTime.now)).build()
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
index 8e6f511..2cfa30f 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryService.scala
@@ -18,9 +18,9 @@
package org.apache.ambari.metrics.adservice.service
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance
+import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance
-trait ADQueryService {
+trait ADQueryService extends AbstractADService{
/**
* API to return list of single metric anomalies satisfying a set of conditions from the anomaly store.
@@ -30,5 +30,5 @@ trait ADQueryService {
* @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score.
* @return
*/
- def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance]
+ def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[MetricAnomalyInstance]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
index e5efa44..3b49208 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/ADQueryServiceImpl.scala
@@ -16,11 +16,30 @@
* limitations under the License.
*/
package org.apache.ambari.metrics.adservice.service
+import org.apache.ambari.metrics.adservice.db.AdAnomalyStoreAccessor
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.SingleMetricAnomalyInstance
+import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance
+import org.slf4j.{Logger, LoggerFactory}
+import com.google.inject.{Inject, Singleton}
+
+@Singleton
class ADQueryServiceImpl extends ADQueryService {
+ val LOG : Logger = LoggerFactory.getLogger(classOf[ADQueryServiceImpl])
+
+ @Inject
+ var adAnomalyStoreAccessor: AdAnomalyStoreAccessor = _
+
+ /**
+ * Initialize Service
+ */
+ override def initialize(): Unit = {
+ LOG.info("Initializing AD Query Service...")
+ adAnomalyStoreAccessor.initialize()
+ LOG.info("Successfully initialized AD Query Service.")
+ }
+
/**
* Implementation to return list of anomalies satisfying a set of conditions from the anomaly store.
*
@@ -30,8 +49,8 @@ class ADQueryServiceImpl extends ADQueryService {
* @param limit Maximim number of anomaly metrics that need to be returned based on anomaly score.
* @return
*/
- override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[SingleMetricAnomalyInstance] = {
- val anomalies = List.empty[SingleMetricAnomalyInstance]
+ override def getTopNAnomaliesByType(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): List[MetricAnomalyInstance] = {
+ val anomalies = adAnomalyStoreAccessor.getMetricAnomalies(anomalyType, startTime, endTime, limit)
anomalies
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala
new file mode 100644
index 0000000..56bb999
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/service/AbstractADService.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.service
+
+trait AbstractADService {
+
+ /**
+ * Initialize Service
+ */
+ def initialize(): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala
index 63cf8c7..56ca2c1 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/pointintime/PointInTimeAnomalyInstance.scala
@@ -23,7 +23,7 @@ import org.apache.ambari.metrics.adservice.common.Season
import org.apache.ambari.metrics.adservice.metadata.MetricKey
import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance}
class PointInTimeAnomalyInstance(val metricKey: MetricKey,
val timestamp: Long,
@@ -31,7 +31,7 @@ class PointInTimeAnomalyInstance(val metricKey: MetricKey,
val methodType: AnomalyDetectionMethod,
val anomalyScore: Double,
val anomalousSeason: Season,
- val modelParameters: String) extends SingleMetricAnomalyInstance {
+ val modelParameters: String) extends MetricAnomalyInstance {
override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
index 3fc0d6f..7392d59 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/subsystem/trend/TrendAnomalyInstance.scala
@@ -21,7 +21,7 @@ import org.apache.ambari.metrics.adservice.common.{Season, TimeRange}
import org.apache.ambari.metrics.adservice.metadata.MetricKey
import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.{AnomalyType, SingleMetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.model.{AnomalyType, MetricAnomalyInstance}
case class TrendAnomalyInstance (metricKey: MetricKey,
anomalousPeriod: TimeRange,
@@ -29,7 +29,7 @@ case class TrendAnomalyInstance (metricKey: MetricKey,
methodType: AnomalyDetectionMethod,
anomalyScore: Double,
seasonInfo: Season,
- modelParameters: String) extends SingleMetricAnomalyInstance {
+ modelParameters: String) extends MetricAnomalyInstance {
override val anomalyType: AnomalyType = AnomalyType.POINT_IN_TIME
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
index 2a4999c..e38ea40 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/app/DefaultADResourceSpecTest.scala
@@ -17,6 +17,8 @@
*/
package org.apache.ambari.metrics.adservice.app
+import java.time.LocalDateTime
+
import javax.ws.rs.client.Client
import javax.ws.rs.core.MediaType.APPLICATION_JSON
@@ -37,7 +39,8 @@ class DefaultADResourceSpecTest extends FunSpec with Matchers {
withAppRunning(classOf[AnomalyDetectionApp], Resources.getResource("config.yml").getPath) { rule =>
val json = client.target(s"http://localhost:${rule.getLocalPort}/anomaly")
.request().accept(APPLICATION_JSON).buildGet().invoke(classOf[String])
- val now = DateTime.now.toString("MM-dd-yyyy hh:mm")
+ val dtf = java.time.format.DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm")
+ val now = LocalDateTime.now
assert(json == "{\"message\":\"Anomaly Detection Service!\"," + "\"today\":\"" + now + "\"}")
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala
index bd38e9a..79366b1 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/AMSMetadataProviderTest.scala
@@ -18,26 +18,32 @@
package org.apache.ambari.metrics.adservice.metadata
+import java.util
+
import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey
import org.scalatest.FunSuite
class AMSMetadataProviderTest extends FunSuite {
test("testFromTimelineMetricKey") {
- val timelineMetricKeys: java.util.Set[TimelineMetricKey] = new java.util.HashSet[TimelineMetricKey]()
+ val timelineMetricKeys: java.util.Set[java.util.Map[String, String]] = new java.util.HashSet[java.util.Map[String, String]]()
val uuid: Array[Byte] = Array.empty[Byte]
for (i <- 1 to 3) {
- val key: TimelineMetricKey = new TimelineMetricKey("M" + i, "App", null, "H", uuid)
- timelineMetricKeys.add(key)
+ val keyMap: java.util.Map[String, String] = new util.HashMap[String, String]()
+ keyMap.put("metricName", "M" + i)
+ keyMap.put("appId", "App")
+ keyMap.put("hostname", "H")
+ keyMap.put("uuid", new String(uuid))
+ timelineMetricKeys.add(keyMap)
}
val aMSMetadataProvider : ADMetadataProvider = new ADMetadataProvider(new MetricCollectorConfiguration)
- val metricKeys : Set[MetricKey] = aMSMetadataProvider.fromTimelineMetricKey(timelineMetricKeys)
+ val metricKeys : Set[MetricKey] = aMSMetadataProvider.getMetricKeys(timelineMetricKeys)
assert(metricKeys.size == 3)
}
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala
index 0149673..c4d4dbc 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/metadata/MetricSourceDefinitionTest.scala
@@ -20,6 +20,10 @@ package org.apache.ambari.metrics.adservice.metadata
import org.apache.commons.lang.SerializationUtils
import org.scalatest.FunSuite
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule
+
class MetricSourceDefinitionTest extends FunSuite {
test("createNewMetricSourceDefinition") {
@@ -65,7 +69,12 @@ class MetricSourceDefinitionTest extends FunSuite {
}
test("serializeDeserialize") {
- val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "testAppId", MetricSourceDefinitionType.API)
+
+ val msd : MetricSourceDefinition = new MetricSourceDefinition("testDefinition", "A1", MetricSourceDefinitionType.API)
+ msd.hosts = List("h1")
+ msd.addMetricDefinition(MetricDefinition("M1", null, List("h2")))
+ msd.addMetricDefinition(MetricDefinition("M1", "A2", null))
+
val msdByteArray: Array[Byte] = SerializationUtils.serialize(msd)
assert(msdByteArray.nonEmpty)
@@ -73,5 +82,10 @@ class MetricSourceDefinitionTest extends FunSuite {
assert(msd2 != null)
assert(msd == msd2)
+ val mapper : ObjectMapper = new ObjectMapper()
+ mapper.registerModule(new ADServiceScalaModule)
+
+ System.out.print(mapper.writeValueAsString(msd))
+
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/pom.xml b/ambari-metrics/ambari-metrics-common/pom.xml
index de49235..3c32b73 100644
--- a/ambari-metrics/ambari-metrics-common/pom.xml
+++ b/ambari-metrics/ambari-metrics-common/pom.xml
@@ -27,12 +27,6 @@
<artifactId>ambari-metrics-common</artifactId>
<name>Ambari Metrics Common</name>
- <properties>
- <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
- <hbase.version>1.1.2.2.6.0.3-8</hbase.version>
- <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version>
- </properties>
-
<build>
<plugins>
<plugin>
@@ -143,45 +137,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-core</artifactId>
- <version>${phoenix.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1-jetty</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
<version>2.10.0</version>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java
deleted file mode 100644
index 7619811..0000000
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricKey.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.metrics2.sink.timeline;
-
-import org.apache.commons.lang.StringUtils;
-
-public class TimelineMetricKey {
- public String metricName;
- public String appId;
- public String instanceId = null;
- public String hostName;
- public byte[] uuid;
-
- public TimelineMetricKey(String metricName, String appId, String instanceId, String hostName, byte[] uuid) {
- this.metricName = metricName;
- this.appId = appId;
- this.instanceId = instanceId;
- this.hostName = hostName;
- this.uuid = uuid;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TimelineMetricKey that = (TimelineMetricKey) o;
-
- if (!metricName.equals(that.metricName)) return false;
- if (!appId.equals(that.appId)) return false;
- if (!hostName.equals(that.hostName)) return false;
- return (StringUtils.isNotEmpty(instanceId) ? instanceId.equals(that.instanceId) : StringUtils.isEmpty(that.instanceId));
- }
-
- @Override
- public int hashCode() {
- int result = metricName.hashCode();
- result = 31 * result + (appId != null ? appId.hashCode() : 0);
- result = 31 * result + (instanceId != null ? instanceId.hashCode() : 0);
- result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java
deleted file mode 100644
index 72e5fb5..0000000
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/ConnectionProvider.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline.query;
-
-
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-
-/**
- *
- */
-public interface ConnectionProvider {
- public Connection getConnection() throws SQLException;
- public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException;
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java
deleted file mode 100644
index a28a433..0000000
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/DefaultPhoenixDataSource.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.metrics2.sink.timeline.query;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
-
- static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
- private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
- private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
- private static final String ZNODE_PARENT = "zookeeper.znode.parent";
-
- private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
- private final String url;
-
- private Configuration hbaseConf;
-
- public DefaultPhoenixDataSource(Configuration hbaseConf) {
- this.hbaseConf = hbaseConf;
- String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
- String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
- String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure");
- if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
- throw new IllegalStateException("Unable to find Zookeeper quorum to " +
- "access HBase store using Phoenix.");
- }
-
- url = String.format(connectionUrl,
- zookeeperQuorum,
- zookeeperClientPort,
- znodeParent);
- }
-
- /**
- * Get HBaseAdmin for table ops.
- * @return @HBaseAdmin
- * @throws IOException
- */
- public HBaseAdmin getHBaseAdmin() throws IOException {
- return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin();
- }
-
- /**
- * Get JDBC connection to HBase store. Assumption is that the hbase
- * configuration is present on the classpath and loaded by the caller into
- * the Configuration object.
- * Phoenix already caches the HConnection between the client and HBase
- * cluster.
- *
- * @return @java.sql.Connection
- */
- public Connection getConnection() throws SQLException {
-
- LOG.debug("Metric store connection url: " + url);
- try {
- return DriverManager.getConnection(url);
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
-
- throw e;
- }
- }
-
- public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory)
- throws SQLException, InterruptedException {
- RetryCounter retryCounter = retryCounterFactory.create();
- while (true) {
- try{
- return getConnection();
- } catch (SQLException e) {
- if(!retryCounter.shouldRetry()){
- LOG.error("HBaseAccessor getConnection failed after "
- + retryCounter.getMaxAttempts() + " attempts");
- throw e;
- }
- }
- retryCounter.sleepUntilNextRetry();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java
deleted file mode 100644
index 194c769..0000000
--- a/ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/query/PhoenixConnectionProvider.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package org.apache.hadoop.metrics2.sink.timeline.query;
-
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-
-import java.io.IOException;
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-public interface PhoenixConnectionProvider extends ConnectionProvider {
- /**
- * Get HBaseAdmin for the Phoenix connection
- * @return
- * @throws IOException
- */
- HBaseAdmin getHBaseAdmin() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
index e90a97f..20b344f 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricsService.java
@@ -42,6 +42,7 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -52,7 +53,6 @@ import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
@@ -497,30 +497,44 @@ public class HBaseTimelineMetricsService extends AbstractService implements Time
* @param metricName
* @param appId
* @param instanceId
- * @param hostname
+ * @param hosts
* @return
* @throws SQLException
* @throws IOException
*/
@Override
- public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
+ public Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts)
+ throws SQLException, IOException {
+ Set<Map<String, String>> timelineMetricKeys = new HashSet<>();
- if (StringUtils.isEmpty(hostname)) {
- Set<String> hosts = new HashSet<>();
+ if (CollectionUtils.isEmpty(hosts)) {
+ Set<String> hostsFromMetadata = new HashSet<>();
for (String host : metricMetadataManager.getHostedAppsCache().keySet()) {
if (metricMetadataManager.getHostedAppsCache().get(host).getHostedApps().contains(appId)) {
- hosts.add(host);
+ hostsFromMetadata.add(host);
}
}
- Set<TimelineMetricKey> timelineMetricKeys = new HashSet<>();
- for (String host : hosts) {
+ for (String host : hostsFromMetadata) {
byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host);
- timelineMetricKeys.add(new TimelineMetricKey(metricName, appId, instanceId, host, uuid));
+ Map<String, String> keyMap = new HashMap<>();
+ keyMap.put("metricName", metricName);
+ keyMap.put("appId", appId);
+ keyMap.put("hostname", host);
+ keyMap.put("uuid", new String(uuid));
+ timelineMetricKeys.add(keyMap);
}
return timelineMetricKeys;
} else {
- byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, hostname);
- return Collections.singleton(new TimelineMetricKey(metricName, appId, instanceId, hostname, uuid));
+ for (String host : hosts) {
+ byte[] uuid = metricMetadataManager.getUuid(metricName, appId, instanceId, host);
+ Map<String, String> keyMap = new HashMap<>();
+ keyMap.put("metricName", metricName);
+ keyMap.put("appId", appId);
+ keyMap.put("hostname", host);
+ keyMap.put("uuid", new String(uuid));
+ timelineMetricKeys.add(keyMap);
+ }
+ return timelineMetricKeys;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 9d595a4..cf382f1 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -120,6 +120,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
@@ -139,8 +140,8 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
-import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource;
-import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.SplitByMetricNamesCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.sink.ExternalMetricsSink;
@@ -210,7 +211,7 @@ public class PhoenixHBaseAccessor {
private HashMap<String, String> tableTTL = new HashMap<>();
private final TimelineMetricConfiguration configuration;
- private List<InternalMetricsSource> rawMetricsSources;
+ private List<InternalMetricsSource> rawMetricsSources = new ArrayList<>();
public PhoenixHBaseAccessor(PhoenixConnectionProvider dataSource) {
this(TimelineMetricConfiguration.getInstance(), dataSource);
@@ -459,6 +460,23 @@ public class PhoenixHBaseAccessor {
return mapper.readValue(json, metricValuesTypeRef);
}
+ private Connection getConnectionRetryingOnException()
+ throws SQLException, InterruptedException {
+ RetryCounter retryCounter = retryCounterFactory.create();
+ while (true) {
+ try{
+ return getConnection();
+ } catch (SQLException e) {
+ if(!retryCounter.shouldRetry()){
+ LOG.error("HBaseAccessor getConnection failed after "
+ + retryCounter.getMaxAttempts() + " attempts");
+ throw e;
+ }
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+
/**
* Get JDBC connection to HBase store. Assumption is that the hbase
* configuration is present on the classpath and loaded by the caller into
@@ -491,7 +509,7 @@ public class PhoenixHBaseAccessor {
try {
LOG.info("Initializing metrics schema...");
- conn = dataSource.getConnectionRetryingOnException(retryCounterFactory);
+ conn = getConnectionRetryingOnException();
stmt = conn.createStatement();
// Metadata
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index f00bd91..349ef83 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
@@ -111,6 +110,6 @@ public interface TimelineMetricStore {
TimelineMetrics getAnomalyMetrics(String method, long startTime, long endTime, Integer limit) throws SQLException;
- Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException;
+ Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) throws SQLException, IOException;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
new file mode 100644
index 0000000..391af27
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/ConnectionProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+/**
+ *
+ */
+public interface ConnectionProvider {
+ public Connection getConnection() throws SQLException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
new file mode 100644
index 0000000..67afe6b
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/DefaultPhoenixDataSource.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class DefaultPhoenixDataSource implements PhoenixConnectionProvider {
+
+ static final Log LOG = LogFactory.getLog(DefaultPhoenixDataSource.class);
+ private static final String ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort";
+ private static final String ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
+ private static final String ZNODE_PARENT = "zookeeper.znode.parent";
+
+ private static final String connectionUrl = "jdbc:phoenix:%s:%s:%s";
+ private final String url;
+
+ private Configuration hbaseConf;
+
+ public DefaultPhoenixDataSource(Configuration hbaseConf) {
+ this.hbaseConf = hbaseConf;
+ String zookeeperClientPort = hbaseConf.getTrimmed(ZOOKEEPER_CLIENT_PORT, "2181");
+ String zookeeperQuorum = hbaseConf.getTrimmed(ZOOKEEPER_QUORUM);
+ String znodeParent = hbaseConf.getTrimmed(ZNODE_PARENT, "/ams-hbase-unsecure");
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty()) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to " +
+ "access HBase store using Phoenix.");
+ }
+
+ url = String.format(connectionUrl,
+ zookeeperQuorum,
+ zookeeperClientPort,
+ znodeParent);
+ }
+
+ /**
+ * Get HBaseAdmin for table ops.
+ * @return @HBaseAdmin
+ * @throws IOException
+ */
+ public HBaseAdmin getHBaseAdmin() throws IOException {
+ return (HBaseAdmin) ConnectionFactory.createConnection(hbaseConf).getAdmin();
+ }
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ public Connection getConnection() throws SQLException {
+
+ LOG.debug("Metric store connection url: " + url);
+ try {
+ return DriverManager.getConnection(url);
+ } catch (SQLException e) {
+ LOG.warn("Unable to connect to HBase store using Phoenix.", e);
+
+ throw e;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
new file mode 100644
index 0000000..cacbcfb
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixConnectionProvider.java
@@ -0,0 +1,31 @@
+package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query;
+
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+
+import java.io.IOException;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface PhoenixConnectionProvider extends ConnectionProvider {
+ /**
+ * Get HBaseAdmin for the Phoenix connection
+ * @return
+ * @throws IOException
+ */
+ HBaseAdmin getHBaseAdmin() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index db35686..dc401e6 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -27,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -69,7 +69,9 @@ import javax.xml.bind.annotation.XmlRootElement;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -516,7 +518,7 @@ public class TimelineWebServices {
@GET
@Path("/metrics/metadata/key")
@Produces({ MediaType.APPLICATION_JSON })
- public Set<TimelineMetricKey> getTimelineMetricKey(
+ public Set<Map<String, String>> getTimelineMetricKey(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@QueryParam("metricName") String metricName,
@@ -527,7 +529,11 @@ public class TimelineWebServices {
init(res);
try {
- return timelineMetricStore.getTimelineMetricKey(metricName, appId, instanceId, hostname);
+ if (StringUtils.isEmpty(hostname)) {
+ return timelineMetricStore.getTimelineMetricKeys(metricName, appId, instanceId, Collections.EMPTY_LIST);
+ } else {
+ return timelineMetricStore.getTimelineMetricKeys(metricName, appId, instanceId, Arrays.asList(StringUtils.split(hostname, ",")));
+ }
} catch (Exception e) {
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
index 7b70a80..03205e7 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryServer.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
-import org.apache.hadoop.metrics2.sink.timeline.query.DefaultPhoenixDataSource;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultPhoenixDataSource;
import org.apache.zookeeper.ClientCnxn;
import org.easymock.EasyMock;
import org.junit.After;
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
index 9c55305..741bb3c 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/AbstractMiniHBaseClusterTest.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
-import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -224,17 +224,6 @@ public abstract class AbstractMiniHBaseClusterTest extends BaseTest {
return connection;
}
- @Override
- public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException {
- Connection connection = null;
- try {
- connection = DriverManager.getConnection(getUrl());
- } catch (SQLException e) {
- LOG.warn("Unable to connect to HBase store using Phoenix.", e);
- }
- return connection;
- }
-
});
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index 5d81faa..50ff656 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.DefaultCondition;
-import org.apache.hadoop.metrics2.sink.timeline.query.PhoenixConnectionProvider;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixConnectionProvider;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.phoenix.exception.PhoenixIOException;
import org.easymock.EasyMock;
@@ -96,10 +96,6 @@ public class PhoenixHBaseAccessorTest {
return null;
}
- @Override
- public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException {
- return null;
- }
};
accessor = new PhoenixHBaseAccessor(connectionProvider);
@@ -256,11 +252,6 @@ public class PhoenixHBaseAccessorTest {
public Connection getConnection() throws SQLException {
return connection;
}
-
- @Override
- public Connection getConnectionRetryingOnException(RetryCounterFactory retryCounterFactory) throws SQLException, InterruptedException {
- return connection;
- }
};
accessor = new PhoenixHBaseAccessor(connectionProvider);
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index 32cc813..9b79fa9 100644
--- a/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
@@ -126,7 +125,7 @@ public class TestTimelineMetricStore implements TimelineMetricStore {
}
@Override
- public Set<TimelineMetricKey> getTimelineMetricKey(String metricName, String appId, String instanceId, String hostname) throws SQLException, IOException {
+ public Set<Map<String, String>> getTimelineMetricKeys(String metricName, String appId, String instanceId, List<String> hosts) throws SQLException, IOException {
return Collections.emptySet();
}
}
[2/2] ambari git commit: AMBARI-22470 : Refine Metric Definition
Service and AD Query service. (avijayan)
Posted by av...@apache.org.
AMBARI-22470 : Refine Metric Definition Service and AD Query service. (avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/ba9be802
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/ba9be802
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/ba9be802
Branch: refs/heads/branch-3.0-ams
Commit: ba9be8028a7608521e1ea769a239276c3cc223bc
Parents: 0fcca47
Author: Aravindan Vijayan <av...@hortonworks.com>
Authored: Mon Nov 20 10:46:13 2017 -0800
Committer: Aravindan Vijayan <av...@hortonworks.com>
Committed: Mon Nov 20 10:46:13 2017 -0800
----------------------------------------------------------------------
.../pom.xml | 63 ++--
.../src/main/resources/config.yml | 2 +-
.../src/main/resources/hbase-site.xml | 286 +++++++++++++++++++
.../adservice/app/ADServiceScalaModule.scala | 50 ++++
.../adservice/app/AnomalyDetectionApp.scala | 10 +-
.../app/AnomalyDetectionAppConfig.scala | 4 +-
.../app/AnomalyDetectionAppModule.scala | 5 +-
.../configuration/HBaseConfiguration.scala | 3 +
.../adservice/db/AdAnomalyStoreAccessor.scala | 36 +++
.../db/AdMetadataStoreAccessorImpl.scala | 96 +++++++
.../adservice/db/ConnectionProvider.scala | 45 +++
.../adservice/db/DefaultPhoenixDataSource.scala | 79 +++++
.../adservice/db/LevelDbStoreAccessor.scala | 56 ----
.../adservice/db/MetadataDatasource.scala | 6 +
.../db/PhoenixAnomalyStoreAccessor.scala | 75 +++--
.../db/PhoenixConnectionProvider.scala | 66 +++++
.../adservice/db/PhoenixQueryConstants.scala | 12 +-
.../adservice/leveldb/LevelDBDatasource.scala | 17 +-
.../adservice/metadata/ADMetadataProvider.scala | 86 ++++--
.../metadata/InputMetricDefinitionParser.scala | 24 +-
.../adservice/metadata/MetricDefinition.scala | 2 +
.../metadata/MetricDefinitionService.scala | 16 +-
.../metadata/MetricDefinitionServiceImpl.scala | 73 +++--
.../metrics/adservice/metadata/MetricKey.scala | 3 +
.../metadata/MetricMetadataProvider.scala | 2 +-
.../adservice/model/MetricAnomalyInstance.scala | 32 +++
.../model/SingleMetricAnomalyInstance.scala | 29 --
.../adservice/resource/AnomalyResource.scala | 55 +++-
.../resource/MetricDefinitionResource.scala | 77 ++++-
.../adservice/resource/RootResource.scala | 5 +-
.../adservice/service/ADQueryService.scala | 6 +-
.../adservice/service/ADQueryServiceImpl.scala | 25 +-
.../adservice/service/AbstractADService.scala | 44 +++
.../PointInTimeAnomalyInstance.scala | 4 +-
.../subsystem/trend/TrendAnomalyInstance.scala | 4 +-
.../app/DefaultADResourceSpecTest.scala | 5 +-
.../metadata/AMSMetadataProviderTest.scala | 16 +-
.../metadata/MetricSourceDefinitionTest.scala | 16 +-
ambari-metrics/ambari-metrics-common/pom.xml | 45 ---
.../sink/timeline/TimelineMetricKey.java | 59 ----
.../sink/timeline/query/ConnectionProvider.java | 32 ---
.../query/DefaultPhoenixDataSource.java | 108 -------
.../query/PhoenixConnectionProvider.java | 31 --
.../timeline/HBaseTimelineMetricsService.java | 36 ++-
.../metrics/timeline/PhoenixHBaseAccessor.java | 26 +-
.../metrics/timeline/TimelineMetricStore.java | 3 +-
.../timeline/query/ConnectionProvider.java | 31 ++
.../query/DefaultPhoenixDataSource.java | 92 ++++++
.../query/PhoenixConnectionProvider.java | 31 ++
.../webapp/TimelineWebServices.java | 12 +-
.../TestApplicationHistoryServer.java | 2 +-
.../timeline/AbstractMiniHBaseClusterTest.java | 13 +-
.../timeline/PhoenixHBaseAccessorTest.java | 11 +-
.../timeline/TestTimelineMetricStore.java | 3 +-
54 files changed, 1424 insertions(+), 546 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 142f02f..c6927dd 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -34,10 +34,12 @@
<properties>
<scala.version>2.12.3</scala.version>
<scala.binary.version>2.11</scala.binary.version>
- <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
<jackson.version>2.9.1</jackson.version>
<dropwizard.version>1.2.0</dropwizard.version>
<spark.version>2.2.0</spark.version>
+ <hadoop.version>2.7.3.2.6.0.3-8</hadoop.version>
+ <hbase.version>1.1.2.2.6.0.3-8</hbase.version>
+ <phoenix.version>4.7.0.2.6.0.3-8</phoenix.version>
</properties>
<repositories>
@@ -64,6 +66,7 @@
<directory>src/main/resources</directory>
<includes>
<include>**/*.yml</include>
+ <include>**/*.xml</include>
<include>**/*.txt</include>
</includes>
</resource>
@@ -145,6 +148,28 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
+ <filter>
+ <artifact>org.apache.phoenix:phoenix-core</artifact>
+ <excludes>
+ <exclude>org/joda/time/**</exclude>
+ <exclude>com/codahale/metrics/**</exclude>
+ <exclude>com/google/common/collect/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>org.apache.phoenix:phoenix-core</artifact>
+ <excludes>
+ <exclude>org/joda/time/**</exclude>
+ <exclude>com/codahale/metrics/**</exclude>
+ <exclude>com/google/common/collect/**</exclude>
+ </excludes>
+ </filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>com/sun/jersey/**</exclude>
+ </excludes>
+ </filter>
</filters>
</configuration>
<executions>
@@ -245,33 +270,25 @@
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
- <artifactId>phoenix-spark</artifactId>
- <version>4.10.0-HBase-1.1</version>
+ <artifactId>phoenix-core</artifactId>
+ <version>${phoenix.version}</version>
<exclusions>
<exclusion>
- <artifactId>jersey-server</artifactId>
- <groupId>com.sun.jersey</groupId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
- <artifactId>jersey-core</artifactId>
- <groupId>com.sun.jersey</groupId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
</exclusion>
<exclusion>
- <artifactId>jersey-client</artifactId>
+ <artifactId>jersey-core</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
<exclusion>
- <artifactId>jersey-guice</artifactId>
- <groupId>com.sun.jersey.contribs</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jersey-json</artifactId>
+ <artifactId>jersey-server</artifactId>
<groupId>com.sun.jersey</groupId>
</exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -379,6 +396,10 @@
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
+ <exclusion>
+ <artifactId>jersey-server</artifactId>
+ <groupId>org.glassfish.jersey.core</groupId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -444,6 +465,12 @@
<artifactId>leveldb</artifactId>
<version>0.9</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.scalaj/scalaj-http -->
+ <dependency>
+ <groupId>org.scalaj</groupId>
+ <artifactId>scalaj-http_2.12</artifactId>
+ <version>2.3.0</version>
+ </dependency>
<dependency>
<groupId>junit</groupId>
@@ -454,7 +481,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>21.0</version>
+ <version>18.0</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 9402f6e..7de06b4 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -27,7 +27,7 @@ metricsCollector:
hosts: host1,host2
port: 6188
protocol: http
- metadataEndpoint: /v1/timeline/metrics/metadata/keys
+ metadataEndpoint: /ws/v1/timeline/metrics/metadata/key
adQueryService:
anomalyDataTtl: 604800
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
new file mode 100644
index 0000000..66f0454
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/hbase-site.xml
@@ -0,0 +1,286 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<configuration>
+
+ <property>
+ <name>dfs.client.read.shortcircuit</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>hbase.client.scanner.caching</name>
+ <value>10000</value>
+ </property>
+
+ <property>
+ <name>hbase.client.scanner.timeout.period</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>hbase.cluster.distributed</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.majorcompaction</name>
+ <value>0</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.max.filesize</name>
+ <value>4294967296</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.memstore.block.multiplier</name>
+ <value>4</value>
+ </property>
+
+ <property>
+ <name>hbase.hregion.memstore.flush.size</name>
+ <value>134217728</value>
+ </property>
+
+ <property>
+ <name>hbase.hstore.blockingStoreFiles</name>
+ <value>200</value>
+ </property>
+
+ <property>
+ <name>hbase.hstore.flusher.count</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>hbase.local.dir</name>
+ <value>${hbase.tmp.dir}/local</value>
+ </property>
+
+ <property>
+ <name>hbase.master.info.bindAddress</name>
+ <value>0.0.0.0</value>
+ </property>
+
+ <property>
+ <name>hbase.master.info.port</name>
+ <value>61310</value>
+ </property>
+
+ <property>
+ <name>hbase.master.normalizer.class</name>
+ <value>org.apache.hadoop.hbase.master.normalizer.SimpleRegionNormalizer</value>
+ </property>
+
+ <property>
+ <name>hbase.master.port</name>
+ <value>61300</value>
+ </property>
+
+ <property>
+ <name>hbase.master.wait.on.regionservers.mintostart</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>hbase.normalizer.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.normalizer.period</name>
+ <value>600000</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.global.memstore.lowerLimit</name>
+ <value>0.3</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.global.memstore.upperLimit</name>
+ <value>0.35</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.info.port</name>
+ <value>61330</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.port</name>
+ <value>61320</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.thread.compaction.large</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>hbase.regionserver.thread.compaction.small</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <name>hbase.replication</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.rootdir</name>
+ <value>file:///var/lib/ambari-metrics-collector/hbase</value>
+ </property>
+
+ <property>
+ <name>hbase.rpc.timeout</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>hbase.snapshot.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <name>hbase.superuser</name>
+ <value>activity_explorer,activity_analyzer</value>
+ </property>
+
+ <property>
+ <name>hbase.tmp.dir</name>
+ <value>/var/lib/ambari-metrics-collector/hbase-tmp</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.leaderport</name>
+ <value>61388</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.peerport</name>
+ <value>61288</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.property.clientPort</name>
+ <value>61181</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.property.dataDir</name>
+ <value>${hbase.tmp.dir}/zookeeper</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.property.tickTime</name>
+ <value>6000</value>
+ </property>
+
+ <property>
+ <name>hbase.zookeeper.quorum</name>
+ <value>c6401.ambari.apache.org</value>
+ <final>true</final>
+ </property>
+
+ <property>
+ <name>hfile.block.cache.size</name>
+ <value>0.3</value>
+ </property>
+
+ <property>
+ <name>phoenix.coprocessor.maxMetaDataCacheSize</name>
+ <value>20480000</value>
+ </property>
+
+ <property>
+ <name>phoenix.coprocessor.maxServerCacheTimeToLiveMs</name>
+ <value>60000</value>
+ </property>
+
+ <property>
+ <name>phoenix.groupby.maxCacheSize</name>
+ <value>307200000</value>
+ </property>
+
+ <property>
+ <name>phoenix.mutate.batchSize</name>
+ <value>10000</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.keepAliveMs</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.maxGlobalMemoryPercentage</name>
+ <value>15</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.rowKeyOrderSaltedTable</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.spoolThresholdBytes</name>
+ <value>20971520</value>
+ </property>
+
+ <property>
+ <name>phoenix.query.timeoutMs</name>
+ <value>300000</value>
+ </property>
+
+ <property>
+ <name>phoenix.sequence.saltBuckets</name>
+ <value>2</value>
+ </property>
+
+ <property>
+ <name>phoenix.spool.directory</name>
+ <value>${hbase.tmp.dir}/phoenix-spool</value>
+ </property>
+
+ <property>
+ <name>zookeeper.session.timeout</name>
+ <value>120000</value>
+ </property>
+
+ <property>
+ <name>zookeeper.session.timeout.localHBaseCluster</name>
+ <value>120000</value>
+ </property>
+
+ <property>
+ <name>zookeeper.znode.parent</name>
+ <value>/ams-hbase-unsecure</value>
+ </property>
+
+ <property>
+ <name>hbase.use.dynamic.jars</name>
+ <value>false</value>
+ </property>
+
+ </configuration>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
new file mode 100644
index 0000000..8578a80
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/ADServiceScalaModule.scala
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.app
+
+import com.fasterxml.jackson.module.scala._
+import com.fasterxml.jackson.module.scala.deser.{ScalaNumberDeserializersModule, UntypedObjectDeserializerModule}
+import com.fasterxml.jackson.module.scala.introspect.{ScalaAnnotationIntrospector, ScalaAnnotationIntrospectorModule}
+
+/**
+ * Extended Jackson Module that fixes the Scala-Jackson BytecodeReadingParanamer issue.
+ */
+class ADServiceScalaModule extends JacksonModule
+ with IteratorModule
+ with EnumerationModule
+ with OptionModule
+ with SeqModule
+ with IterableModule
+ with TupleModule
+ with MapModule
+ with SetModule
+ with FixedScalaAnnotationIntrospectorModule
+ with UntypedObjectDeserializerModule
+ with EitherModule {
+
+ override def getModuleName = "ADServiceScalaModule"
+
+ object ADServiceScalaModule extends ADServiceScalaModule
+
+}
+
+
+trait FixedScalaAnnotationIntrospectorModule extends JacksonModule {
+ this += { _.appendAnnotationIntrospector(ScalaAnnotationIntrospector) }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
index 8b3a829..2d0dbdf 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionApp.scala
@@ -21,6 +21,9 @@ import javax.ws.rs.Path
import javax.ws.rs.container.{ContainerRequestFilter, ContainerResponseFilter}
import org.apache.ambari.metrics.adservice.app.GuiceInjector.{withInjector, wrap}
+import org.apache.ambari.metrics.adservice.db.{AdAnomalyStoreAccessor, MetadataDatasource}
+import org.apache.ambari.metrics.adservice.metadata.MetricDefinitionService
+import org.apache.ambari.metrics.adservice.service.ADQueryService
import org.glassfish.jersey.filter.LoggingFilter
import com.codahale.metrics.health.HealthCheck
@@ -45,6 +48,11 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
injector.instancesOfType(classOf[HealthCheck]).foreach { h => env.healthChecks.register(h.getClass.getName, h) }
injector.instancesOfType(classOf[ContainerRequestFilter]).foreach { f => env.jersey().register(f) }
injector.instancesOfType(classOf[ContainerResponseFilter]).foreach { f => env.jersey().register(f) }
+
+ //Initialize Services
+ injector.getInstance(classOf[MetadataDatasource]).initialize
+ injector.getInstance(classOf[MetricDefinitionService]).initialize
+ injector.getInstance(classOf[ADQueryService]).initialize
}
env.jersey.register(jacksonJaxbJsonProvider)
env.jersey.register(new LoggingFilter)
@@ -53,7 +61,7 @@ class AnomalyDetectionApp extends Application[AnomalyDetectionAppConfig] {
private def jacksonJaxbJsonProvider: JacksonJaxbJsonProvider = {
val provider = new JacksonJaxbJsonProvider()
val objectMapper = new ObjectMapper()
- objectMapper.registerModule(DefaultScalaModule)
+ objectMapper.registerModule(new ADServiceScalaModule)
objectMapper.registerModule(new JodaModule)
objectMapper.configure(SerializationFeature.WRAP_ROOT_VALUE, false)
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index 93f6b28..f9ed4b2 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -42,7 +42,7 @@ class AnomalyDetectionAppConfig extends Configuration {
private val metricCollectorConfiguration = new MetricCollectorConfiguration
/*
- Anomaly Service configuration
+ Anomaly Query Service configuration
*/
@Valid
private val adServiceConfiguration = new AdServiceConfiguration
@@ -54,7 +54,7 @@ class AnomalyDetectionAppConfig extends Configuration {
private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration
/*
- HBase Conf
+ AMS HBase Conf
*/
@JsonIgnore
def getHBaseConf : org.apache.hadoop.conf.Configuration = {
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index a896563..68e9df9 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -17,7 +17,7 @@
*/
package org.apache.ambari.metrics.adservice.app
-import org.apache.ambari.metrics.adservice.db.{AdMetadataStoreAccessor, LevelDbStoreAccessor, MetadataDatasource}
+import org.apache.ambari.metrics.adservice.db._
import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricDefinitionServiceImpl}
import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, MetricDefinitionResource, RootResource}
@@ -38,9 +38,10 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
bind(classOf[AnomalyResource])
bind(classOf[MetricDefinitionResource])
bind(classOf[RootResource])
- bind(classOf[AdMetadataStoreAccessor]).to(classOf[LevelDbStoreAccessor])
+ bind(classOf[AdMetadataStoreAccessor]).to(classOf[AdMetadataStoreAccessorImpl])
bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
bind(classOf[MetricDefinitionService]).to(classOf[MetricDefinitionServiceImpl])
bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
+ bind(classOf[AdAnomalyStoreAccessor]).to(classOf[PhoenixAnomalyStoreAccessor])
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
index a51a959..a95ff15 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/HBaseConfiguration.scala
@@ -28,6 +28,9 @@ object HBaseConfiguration {
var isInitialized: Boolean = false
val LOG : Logger = LoggerFactory.getLogger("HBaseConfiguration")
+ /**
+ * Initialize the hbase conf from hbase-site present in classpath.
+ */
def initConfigs(): Unit = {
if (!isInitialized) {
var classLoader: ClassLoader = Thread.currentThread.getContextClassLoader
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
new file mode 100644
index 0000000..676b09a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdAnomalyStoreAccessor.scala
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+import org.apache.ambari.metrics.adservice.model.MetricAnomalyInstance
+
+/**
+ * Trait for anomaly store accessor. (Phoenix)
+ */
+trait AdAnomalyStoreAccessor {
+
+ def initialize(): Unit
+
+ def getMetricAnomalies(anomalyType: AnomalyType,
+ startTime: Long,
+ endTime: Long,
+ limit: Int) : List[MetricAnomalyInstance]
+
+ }
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
new file mode 100644
index 0000000..7405459
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/AdMetadataStoreAccessorImpl.scala
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
+import org.apache.commons.lang.SerializationUtils
+
+import com.google.inject.Inject
+
+/**
+ * Implementation of the AdMetadataStoreAccessor.
+ * Serves as the adaptor between metric definition service and LevelDB worlds.
+ */
+class AdMetadataStoreAccessorImpl extends AdMetadataStoreAccessor {
+
+ @Inject
+ var metadataDataSource: MetadataDatasource = _
+
+ @Inject
+ def this(metadataDataSource: MetadataDatasource) = {
+ this
+ this.metadataDataSource = metadataDataSource
+ }
+
+ /**
+ * Return all saved component definitions from DB.
+ *
+ * @return
+ */
+ override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
+ val valuesFromStore : List[MetadataDatasource#Value] = metadataDataSource.getAll
+ val definitions = scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (value : Array[Byte] <- valuesFromStore) {
+ val definition : MetricSourceDefinition = SerializationUtils.deserialize(value).asInstanceOf[MetricSourceDefinition]
+ if (definition != null) {
+ definitions.+=(definition)
+ }
+ }
+ definitions.toList
+ }
+
+ /**
+ * Save a set of component definitions
+ *
+ * @param metricSourceDefinitions Set of component definitions
+ * @return Success / Failure
+ */
+ override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
+ for (definition <- metricSourceDefinitions) {
+ saveInputDefinition(definition)
+ }
+ true
+ }
+
+ /**
+ * Save a component definition
+ *
+ * @param metricSourceDefinition component definition
+ * @return Success / Failure
+ */
+ override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
+ val storeValue : MetadataDatasource#Value = SerializationUtils.serialize(metricSourceDefinition)
+ val storeKey : MetadataDatasource#Key = metricSourceDefinition.definitionName.getBytes()
+ metadataDataSource.put(storeKey, storeValue)
+ true
+ }
+
+ /**
+ * Delete a component definition
+ *
+ * @param definitionName component definition
+ * @return
+ */
+ override def removeInputDefinition(definitionName: String): Boolean = {
+ val storeKey : MetadataDatasource#Key = definitionName.getBytes()
+ metadataDataSource.delete(storeKey)
+ true
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
new file mode 100644
index 0000000..cc02ed4
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/ConnectionProvider.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.db
+
+import java.sql.Connection
+import java.sql.SQLException
+
+/**
+ * Provides a connection to the anomaly store.
+ */
+trait ConnectionProvider {
+ @throws[SQLException]
+ def getConnection: Connection
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
new file mode 100644
index 0000000..d9396de
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/DefaultPhoenixDataSource.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.commons.logging.LogFactory
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.client.ConnectionFactory
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import java.io.IOException
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.SQLException
+
+object DefaultPhoenixDataSource {
+ private[db] val LOG = LogFactory.getLog(classOf[DefaultPhoenixDataSource])
+ private val ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.property.clientPort"
+ private val ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"
+ private val ZNODE_PARENT = "zookeeper.znode.parent"
+ private val connectionUrl = "jdbc:phoenix:%s:%s:%s"
+}
+
+class DefaultPhoenixDataSource(var hbaseConf: Configuration) extends PhoenixConnectionProvider {
+
+ val zookeeperClientPort: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_CLIENT_PORT, "2181")
+ val zookeeperQuorum: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZOOKEEPER_QUORUM)
+ val znodeParent: String = hbaseConf.getTrimmed(DefaultPhoenixDataSource.ZNODE_PARENT, "/ams-hbase-unsecure")
+ final private var url : String = _
+
+ if (zookeeperQuorum == null || zookeeperQuorum.isEmpty) {
+ throw new IllegalStateException("Unable to find Zookeeper quorum to access HBase store using Phoenix.")
+ }
+ url = String.format(DefaultPhoenixDataSource.connectionUrl, zookeeperQuorum, zookeeperClientPort, znodeParent)
+
+
+ /**
+ * Get HBaseAdmin for table ops.
+ *
+ * @return @HBaseAdmin
+ * @throws IOException
+ */
+ @throws[IOException]
+ override def getHBaseAdmin: HBaseAdmin = ConnectionFactory.createConnection(hbaseConf).getAdmin.asInstanceOf[HBaseAdmin]
+
+ /**
+ * Get JDBC connection to HBase store. Assumption is that the hbase
+ * configuration is present on the classpath and loaded by the caller into
+ * the Configuration object.
+ * Phoenix already caches the HConnection between the client and HBase
+ * cluster.
+ *
+ * @return @java.sql.Connection
+ */
+ @throws[SQLException]
+ override def getConnection: Connection = {
+ DefaultPhoenixDataSource.LOG.debug("Metric store connection url: " + url)
+ try DriverManager.getConnection(url)
+ catch {
+ case e: SQLException =>
+ DefaultPhoenixDataSource.LOG.warn("Unable to connect to HBase store using Phoenix.", e)
+ throw e
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
deleted file mode 100644
index baad57d..0000000
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/LevelDbStoreAccessor.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.ambari.metrics.adservice.db
-
-import org.apache.ambari.metrics.adservice.metadata.MetricSourceDefinition
-
-import com.google.inject.Inject
-
-class LevelDbStoreAccessor extends AdMetadataStoreAccessor{
-
- @Inject
- var levelDbDataSource : MetadataDatasource = _
-
- @Inject
- def this(levelDbDataSource: MetadataDatasource) = {
- this
- this.levelDbDataSource = levelDbDataSource
- }
-
- /**
- * Return all saved component definitions from DB.
- *
- * @return
- */
- override def getSavedInputDefinitions: List[MetricSourceDefinition] = {
- List.empty[MetricSourceDefinition]
- }
-
- /**
- * Save a set of component definitions
- *
- * @param metricSourceDefinitions Set of component definitions
- * @return Success / Failure
- */
-override def saveInputDefinitions(metricSourceDefinitions: List[MetricSourceDefinition]): Boolean = {
- true
-}
-
- /**
- * Save a component definition
- *
- * @param metricSourceDefinition component definition
- * @return Success / Failure
- */
- override def saveInputDefinition(metricSourceDefinition: MetricSourceDefinition): Boolean = {
- true
- }
-
- /**
- * Delete a component definition
- *
- * @param definitionName component definition
- * @return
- */
- override def removeInputDefinition(definitionName: String): Boolean = {
- true
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
index aa6694a..7b223a2 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
@@ -44,6 +44,12 @@ trait MetadataDatasource {
*/
def get(key: Key): Option[Value]
+ /**
+ * This function obtains all the values
+ *
+ * @return the list of values
+ */
+ def getAll: List[Value]
/**
* This function associates a key to a value, overwriting if necessary
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
index 36aea21..147d1f7 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixAnomalyStoreAccessor.scala
@@ -23,48 +23,60 @@ import java.util.concurrent.TimeUnit.SECONDS
import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
import org.apache.ambari.metrics.adservice.common._
import org.apache.ambari.metrics.adservice.configuration.HBaseConfiguration
-import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.metadata.{MetricDefinitionService, MetricKey}
import org.apache.ambari.metrics.adservice.model.AnomalyDetectionMethod.AnomalyDetectionMethod
import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, SingleMetricAnomalyInstance}
+import org.apache.ambari.metrics.adservice.model.{AnomalyDetectionMethod, AnomalyType, MetricAnomalyInstance}
import org.apache.ambari.metrics.adservice.subsystem.pointintime.PointInTimeAnomalyInstance
import org.apache.ambari.metrics.adservice.subsystem.trend.TrendAnomalyInstance
import org.apache.hadoop.hbase.util.RetryCounterFactory
-import org.apache.hadoop.metrics2.sink.timeline.query.{DefaultPhoenixDataSource, PhoenixConnectionProvider}
+import org.slf4j.{Logger, LoggerFactory}
import com.google.inject.Inject
-object PhoenixAnomalyStoreAccessor {
+/**
+ * Phoenix query handler class.
+ */
+class PhoenixAnomalyStoreAccessor extends AdAnomalyStoreAccessor {
@Inject
var configuration: AnomalyDetectionAppConfig = _
+ @Inject
+ var metricDefinitionService: MetricDefinitionService = _
+
var datasource: PhoenixConnectionProvider = _
+ val LOG : Logger = LoggerFactory.getLogger(classOf[PhoenixAnomalyStoreAccessor])
- def initAnomalyMetricSchema(): Unit = {
+ @Override
+ def initialize(): Unit = {
- val datasource: PhoenixConnectionProvider = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
+ datasource = new DefaultPhoenixDataSource(HBaseConfiguration.getHBaseConf)
val retryCounterFactory = new RetryCounterFactory(10, SECONDS.toMillis(3).toInt)
val ttl = configuration.getAdServiceConfiguration.getAnomalyDataTtl
try {
- var conn = datasource.getConnectionRetryingOnException(retryCounterFactory)
+ var conn : Connection = getConnectionRetryingOnException(retryCounterFactory)
var stmt = conn.createStatement
+ //Create Method parameters table.
val methodParametersSql = String.format(PhoenixQueryConstants.CREATE_METHOD_PARAMETERS_TABLE,
PhoenixQueryConstants.METHOD_PARAMETERS_TABLE_NAME)
stmt.executeUpdate(methodParametersSql)
+ //Create Point in Time anomaly table
val pointInTimeAnomalySql = String.format(PhoenixQueryConstants.CREATE_PIT_ANOMALY_METRICS_TABLE_SQL,
PhoenixQueryConstants.PIT_ANOMALY_METRICS_TABLE_NAME,
ttl.asInstanceOf[Object])
stmt.executeUpdate(pointInTimeAnomalySql)
+ //Create Trend Anomaly table
val trendAnomalySql = String.format(PhoenixQueryConstants.CREATE_TREND_ANOMALY_METRICS_TABLE_SQL,
PhoenixQueryConstants.TREND_ANOMALY_METRICS_TABLE_NAME,
ttl.asInstanceOf[Object])
stmt.executeUpdate(trendAnomalySql)
+ //Create model snapshot table.
val snapshotSql = String.format(PhoenixQueryConstants.CREATE_MODEL_SNAPSHOT_TABLE,
PhoenixQueryConstants.MODEL_SNAPSHOT)
stmt.executeUpdate(snapshotSql)
@@ -75,11 +87,9 @@ object PhoenixAnomalyStoreAccessor {
}
}
- @throws[SQLException]
- def getConnection: Connection = datasource.getConnection
-
- def getSingleMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : scala.collection.mutable.MutableList[SingleMetricAnomalyInstance] = {
- val anomalies = scala.collection.mutable.MutableList.empty[SingleMetricAnomalyInstance]
+ @Override
+ def getMetricAnomalies(anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int) : List[MetricAnomalyInstance] = {
+ val anomalies = scala.collection.mutable.MutableList.empty[MetricAnomalyInstance]
val conn : Connection = getConnection
var stmt : PreparedStatement = null
var rs : ResultSet = null
@@ -98,8 +108,8 @@ object PhoenixAnomalyStoreAccessor {
val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
- val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
- val anomalyInstance: SingleMetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
+ val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid)
+ val anomalyInstance: MetricAnomalyInstance = new PointInTimeAnomalyInstance(metricKey, timestamp,
metricValue, methodType, anomalyScore, season, modelSnapshot)
anomalies.+=(anomalyInstance)
}
@@ -115,8 +125,8 @@ object PhoenixAnomalyStoreAccessor {
val anomalyScore: Double = rs.getDouble("ANOMALY_SCORE")
val modelSnapshot: String = rs.getString("MODEL_PARAMETERS")
- val metricKey: MetricKey = null //MetricManager.getMetricKeyFromUuid(uuid) //TODO
- val anomalyInstance: SingleMetricAnomalyInstance = TrendAnomalyInstance(metricKey,
+ val metricKey: MetricKey = metricDefinitionService.getMetricKeyFromUuid(uuid)
+ val anomalyInstance: MetricAnomalyInstance = TrendAnomalyInstance(metricKey,
TimeRange(anomalyStart, anomalyEnd),
TimeRange(referenceStart, referenceEnd),
methodType, anomalyScore, season, modelSnapshot)
@@ -127,11 +137,11 @@ object PhoenixAnomalyStoreAccessor {
case e: SQLException => throw e
}
- anomalies
+ anomalies.toList
}
@throws[SQLException]
- def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
+ private def prepareAnomalyMetricsGetSqlStatement(connection: Connection, anomalyType: AnomalyType, startTime: Long, endTime: Long, limit: Int): PreparedStatement = {
val sb = new StringBuilder
@@ -145,11 +155,11 @@ object PhoenixAnomalyStoreAccessor {
var stmt: java.sql.PreparedStatement = null
try {
stmt = connection.prepareStatement(sb.toString)
- var pos = 1
- pos += 1
+ var pos = 1
stmt.setLong(pos, startTime)
+ pos += 1
stmt.setLong(pos, endTime)
stmt.setFetchSize(limit)
@@ -157,9 +167,32 @@ object PhoenixAnomalyStoreAccessor {
} catch {
case e: SQLException =>
if (stmt != null)
- stmt
+ return stmt
throw e
}
stmt
}
+
+ @throws[SQLException]
+ private def getConnection: Connection = datasource.getConnection
+
+ @throws[SQLException]
+ @throws[InterruptedException]
+ private def getConnectionRetryingOnException (retryCounterFactory : RetryCounterFactory) : Connection = {
+ val retryCounter = retryCounterFactory.create
+ while(true) {
+ try
+ return getConnection
+ catch {
+ case e: SQLException =>
+ if (!retryCounter.shouldRetry) {
+ LOG.error("HBaseAccessor getConnection failed after " + retryCounter.getMaxAttempts + " attempts")
+ throw e
+ }
+ }
+ retryCounter.sleepUntilNextRetry()
+ }
+ null
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
new file mode 100644
index 0000000..1faf1ba
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixConnectionProvider.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *//**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import java.io.IOException
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+trait PhoenixConnectionProvider extends ConnectionProvider {
+ /**
+ * Get HBaseAdmin for the Phoenix connection
+ *
+ * @return
+ * @throws IOException
+ */
+ @throws[IOException]
+ def getHBaseAdmin: HBaseAdmin
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
index 5379c91..d9774e0 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/PhoenixQueryConstants.scala
@@ -54,25 +54,25 @@ object PhoenixQueryConstants {
val CREATE_TREND_ANOMALY_METRICS_TABLE_SQL: String = "CREATE TABLE IF NOT EXISTS %s (" +
"METRIC_UUID BINARY(20) NOT NULL, " +
+ "METHOD_NAME VARCHAR, " +
"ANOMALY_PERIOD_START UNSIGNED_LONG NOT NULL, " +
"ANOMALY_PERIOD_END UNSIGNED_LONG NOT NULL, " +
"TEST_PERIOD_START UNSIGNED_LONG NOT NULL, " +
"TEST_PERIOD_END UNSIGNED_LONG NOT NULL, " +
- "METHOD_NAME VARCHAR, " +
"SEASONAL_INFO VARCHAR, " +
"ANOMALY_SCORE DOUBLE, " +
"MODEL_PARAMETERS VARCHAR, " +
"DETECTION_TIME UNSIGNED_LONG " +
"CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, ANOMALY_PERIOD_START, ANOMALY_PERIOD_END, TEST_PERIOD_START, TEST_PERIOD_END)) " +
- "DATA_BLOCK_ENCODING='FAST_DIFF' IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
+ "DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, TTL=%s, COMPRESSION='SNAPPY'"
val CREATE_MODEL_SNAPSHOT_TABLE: String = "CREATE TABLE IF NOT EXISTS %s (" +
- "METRIC_UUID BINARY(20), " +
+ "METRIC_UUID BINARY(20) NOT NULL, " +
"METHOD_NAME VARCHAR, " +
"METHOD_TYPE VARCHAR, " +
- "PARAMETERS VARCHAR " +
- "SNAPSHOT_TIME UNSIGNED LONG NOT NULL " +
- "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME)) " +
+ "PARAMETERS VARCHAR, " +
+ "SNAPSHOT_TIME UNSIGNED_LONG NOT NULL " +
+ "CONSTRAINT pk PRIMARY KEY (METRIC_UUID, METHOD_NAME, SNAPSHOT_TIME)) " +
"DATA_BLOCK_ENCODING='FAST_DIFF', IMMUTABLE_ROWS=true, COMPRESSION='SNAPPY'"
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
index a34a60a..49ef272 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
@@ -42,7 +42,6 @@ class LevelDBDataSource() extends MetadataDatasource {
def this(appConfig: AnomalyDetectionAppConfig) = {
this
this.appConfig = appConfig
- initialize()
}
override def initialize(): Unit = {
@@ -83,6 +82,22 @@ class LevelDBDataSource() extends MetadataDatasource {
override def get(key: Key): Option[Value] = Option(db.get(key))
/**
+ * This function obtains all the values
+ *
+ * @return the list of values
+ */
+ def getAll: List[Value] = {
+ val values = scala.collection.mutable.MutableList.empty[Value]
+ val iterator = db.iterator()
+ iterator.seekToFirst()
+ while (iterator.hasNext) {
+ val entry: java.util.Map.Entry[Key, Value] = iterator.next()
+ values.+=(entry.getValue)
+ }
+ values.toList
+ }
+
+ /**
* This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
*
* @param toRemove which includes all the keys to be removed from the DataSource.
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
index 95b1b63..c277221 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/ADMetadataProvider.scala
@@ -17,15 +17,17 @@
package org.apache.ambari.metrics.adservice.metadata
-import java.net.{HttpURLConnection, URL}
+import javax.ws.rs.core.Response
import org.apache.ambari.metrics.adservice.configuration.MetricCollectorConfiguration
import org.apache.commons.lang.StringUtils
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricKey
+import org.slf4j.{Logger, LoggerFactory}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
+import scalaj.http.{Http, HttpRequest, HttpResponse}
+
/**
* Class to invoke Metrics Collector metadata API.
* TODO : Instantiate a sync thread that regularly updates the internal maps by reading off AMS metadata.
@@ -36,6 +38,7 @@ class ADMetadataProvider extends MetricMetadataProvider {
var metricCollectorPort: String = _
var metricCollectorProtocol: String = _
var metricMetadataPath: String = "/v1/timeline/metrics/metadata/keys"
+ val LOG : Logger = LoggerFactory.getLogger(classOf[ADMetadataProvider])
val connectTimeout: Int = 10000
val readTimeout: Int = 10000
@@ -52,10 +55,8 @@ class ADMetadataProvider extends MetricMetadataProvider {
metricMetadataPath = configuration.getMetadataEndpoint
}
- override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition,
- Set[MetricKey]], Set[MetricKey]) = {
+ override def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey] = {
- val keysMap = scala.collection.mutable.Map[MetricDefinition, Set[MetricKey]]()
val numDefinitions: Int = metricSourceDefinition.metricDefinitions.size
val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
@@ -64,52 +65,79 @@ class ADMetadataProvider extends MetricMetadataProvider {
for (host <- metricCollectorHosts) {
val metricKeys: Set[MetricKey] = getKeysFromMetricsCollector(metricCollectorProtocol, host, metricCollectorPort, metricMetadataPath, metricDef)
if (metricKeys != null) {
- keysMap += (metricDef -> metricKeys)
- metricKeySet.++(metricKeys)
+ metricKeySet.++=(metricKeys)
}
}
}
}
- (keysMap.toMap, metricKeySet.toSet)
+ metricKeySet.toSet
}
/**
- * Make Metrics Collector REST API call to fetch keys.
*
- * @param url
+ * @param protocol
+ * @param host
+ * @param port
+ * @param path
* @param metricDefinition
* @return
*/
def getKeysFromMetricsCollector(protocol: String, host: String, port: String, path: String, metricDefinition: MetricDefinition): Set[MetricKey] = {
- val url: String = protocol + "://" + host + port + "/" + path
+ val url: String = protocol + "://" + host + ":" + port + path
val mapper = new ObjectMapper() with ScalaObjectMapper
+
+ if (metricDefinition.hosts == null || metricDefinition.hosts.isEmpty) {
+ val request: HttpRequest = Http(url)
+ .param("metricName", metricDefinition.metricName)
+ .param("appId", metricDefinition.appId)
+ makeHttpGetCall(request, mapper)
+ } else {
+ val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
+
+ for (h <- metricDefinition.hosts) {
+ val request: HttpRequest = Http(url)
+ .param("metricName", metricDefinition.metricName)
+ .param("appId", metricDefinition.appId)
+ .param("hostname", h)
+
+ val metricKeys = makeHttpGetCall(request, mapper)
+ metricKeySet.++=(metricKeys)
+ }
+ metricKeySet.toSet
+ }
+ }
+
+ private def makeHttpGetCall(request: HttpRequest, mapper: ObjectMapper): Set[MetricKey] = {
+
try {
- val connection = new URL(url).openConnection.asInstanceOf[HttpURLConnection]
- connection.setConnectTimeout(connectTimeout)
- connection.setReadTimeout(readTimeout)
- connection.setRequestMethod("GET")
- val inputStream = connection.getInputStream
- val content = scala.io.Source.fromInputStream(inputStream).mkString
- if (inputStream != null) inputStream.close()
- val metricKeySet: Set[MetricKey] = fromTimelineMetricKey(mapper.readValue[java.util.Set[TimelineMetricKey]](content))
- return metricKeySet
+ val result: HttpResponse[String] = request.asString
+ if (result.code == Response.Status.OK.getStatusCode) {
+ LOG.info("Successfully fetched metric keys from metrics collector")
+ val metricKeySet: java.util.Set[java.util.Map[String, String]] = mapper.readValue(result.body,
+ classOf[java.util.Set[java.util.Map[String, String]]])
+ getMetricKeys(metricKeySet)
+ } else {
+ LOG.error("Got an error when trying to fetch metric key from metrics collector. Code = " + result.code + ", Message = " + result.body)
+ }
} catch {
- case _: java.io.IOException | _: java.net.SocketTimeoutException => // handle this
+ case _: java.io.IOException | _: java.net.SocketTimeoutException => LOG.error("Unable to fetch metric keys from Metrics collector for : " + request.toString)
}
- null
+ Set.empty[MetricKey]
}
- def fromTimelineMetricKey(timelineMetricKeys: java.util.Set[TimelineMetricKey]): Set[MetricKey] = {
+
+ def getMetricKeys(timelineMetricKeys: java.util.Set[java.util.Map[String, String]]): Set[MetricKey] = {
val metricKeySet: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
val iter = timelineMetricKeys.iterator()
while (iter.hasNext) {
- val timelineMetricKey: TimelineMetricKey = iter.next()
- val metricKey: MetricKey = MetricKey(timelineMetricKey.metricName,
- timelineMetricKey.appId,
- timelineMetricKey.instanceId,
- timelineMetricKey.hostName,
- timelineMetricKey.uuid)
+ val timelineMetricKey: java.util.Map[String, String] = iter.next()
+ val metricKey: MetricKey = MetricKey(
+ timelineMetricKey.get("metricName"),
+ timelineMetricKey.get("appId"),
+ timelineMetricKey.get("instanceId"),
+ timelineMetricKey.get("hostname"),
+ timelineMetricKey.get("uuid").getBytes())
metricKeySet.add(metricKey)
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
index cc66c90..3c8ea84 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/InputMetricDefinitionParser.scala
@@ -19,6 +19,8 @@ package org.apache.ambari.metrics.adservice.metadata
import java.io.File
+import org.apache.ambari.metrics.adservice.app.ADServiceScalaModule
+
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
@@ -30,15 +32,19 @@ object InputMetricDefinitionParser {
return List.empty[MetricSourceDefinition]
}
val mapper = new ObjectMapper() with ScalaObjectMapper
-
- def metricSourceDefinitions: List[MetricSourceDefinition] =
- for {
- file <- getFilesInDirectory(directory)
- definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](file)
- if definition != null
- } yield definition
-
- metricSourceDefinitions
+ mapper.registerModule(new ADServiceScalaModule)
+ val metricSourceDefinitions: scala.collection.mutable.MutableList[MetricSourceDefinition] =
+ scala.collection.mutable.MutableList.empty[MetricSourceDefinition]
+
+ for (file <- getFilesInDirectory(directory)) {
+ val source = scala.io.Source.fromFile(file)
+ val lines = try source.mkString finally source.close()
+ val definition: MetricSourceDefinition = mapper.readValue[MetricSourceDefinition](lines)
+ if (definition != null) {
+ metricSourceDefinitions.+=(definition)
+ }
+ }
+ metricSourceDefinitions.toList
}
private def getFilesInDirectory(directory: String): List[File] = {
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
index 036867b..c668dfa 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinition.scala
@@ -19,6 +19,8 @@
package org.apache.ambari.metrics.adservice.metadata
import org.apache.commons.lang3.StringUtils
+
+import com.fasterxml.jackson.annotation.JsonIgnore
/*
{
"metric-name": "mem_free",
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
index 635dc60..52ce39e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionService.scala
@@ -17,7 +17,9 @@
package org.apache.ambari.metrics.adservice.metadata
-trait MetricDefinitionService {
+import org.apache.ambari.metrics.adservice.service.AbstractADService
+
+trait MetricDefinitionService extends AbstractADService{
/**
* Given a 'UUID', return the metric key associated with it.
@@ -27,6 +29,12 @@ trait MetricDefinitionService {
def getMetricKeyFromUuid(uuid: Array[Byte]) : MetricKey
/**
+ * Return all the definitions being tracked.
+ * @return Map of Metric Source Definition name to Metric Source Definition.
+ */
+ def getDefinitions: List[MetricSourceDefinition]
+
+ /**
* Given a component definition name, return the definition associated with it.
* @param name component definition name
* @return
@@ -61,4 +69,10 @@ trait MetricDefinitionService {
*/
def getDefinitionByAppId(appId: String) : List[MetricSourceDefinition]
+ /**
+ * Return the mapping between definition name to set of metric keys.
+ * @return Map of Metric Source Definition to set of metric keys associated with it.
+ */
+ def getMetricKeys: Map[String, Set[MetricKey]]
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
index c34d2dd..b9b4a7c 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricDefinitionServiceImpl.scala
@@ -32,31 +32,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
var configuration: AnomalyDetectionAppConfig = _
var metricMetadataProvider: MetricMetadataProvider = _
- var metricSourceDefinitionMap: Map[String, MetricSourceDefinition] = Map()
- var metricKeys: Set[MetricKey] = Set.empty[MetricKey]
- var metricDefinitionMetricKeyMap: Map[MetricDefinition, Set[MetricKey]] = Map()
+ val metricSourceDefinitionMap: scala.collection.mutable.Map[String, MetricSourceDefinition] = scala.collection.mutable.Map()
+ val metricDefinitionMetricKeyMap: scala.collection.mutable.Map[MetricSourceDefinition, Set[MetricKey]] = scala.collection.mutable.Map()
+ val metricKeys: scala.collection.mutable.Set[MetricKey] = scala.collection.mutable.Set.empty[MetricKey]
@Inject
def this (anomalyDetectionAppConfig: AnomalyDetectionAppConfig, metadataStoreAccessor: AdMetadataStoreAccessor) = {
this ()
adMetadataStoreAccessor = metadataStoreAccessor
configuration = anomalyDetectionAppConfig
- initializeService()
}
- def initializeService() : Unit = {
-
- //Create AD Metadata Schema
- //TODO Make sure AD Metadata DB is initialized here.
+ @Override
+ def initialize() : Unit = {
+ LOG.info("Initializing Metric Definition Service...")
//Initialize Metric Metadata Provider
metricMetadataProvider = new ADMetadataProvider(configuration.getMetricCollectorConfiguration)
- loadMetricSourceDefinitions()
- }
-
- def loadMetricSourceDefinitions() : Unit = {
-
//Load definitions from metadata store
val definitionsFromStore: List[MetricSourceDefinition] = adMetadataStoreAccessor.getSavedInputDefinitions
for (definition <- definitionsFromStore) {
@@ -71,14 +64,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
//Union the 2 sources, with DB taking precedence.
//Save new definition list to DB.
- metricSourceDefinitionMap = metricSourceDefinitionMap.++(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
+ metricSourceDefinitionMap.++=(combineDefinitionSources(definitionsFromConfig, definitionsFromStore))
- //Reach out to AMS Metadata and get Metric Keys. Pass in List<CD> and get back (Map<MD,Set<MK>>, Set<MK>)
+ //Reach out to AMS Metadata and get Metric Keys. Pass in MSD and get back Set<MK>
for (definition <- metricSourceDefinitionMap.values) {
- val (definitionKeyMap: Map[MetricDefinition, Set[MetricKey]], keys: Set[MetricKey])= metricMetadataProvider.getMetricKeysForDefinitions(definition)
- metricDefinitionMetricKeyMap = metricDefinitionMetricKeyMap.++(definitionKeyMap)
- metricKeys = metricKeys.++(keys)
+ val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap(definition) = keys
+ metricKeys.++=(keys)
}
+
+ LOG.info("Successfully initialized Metric Definition Service.")
}
def getMetricKeyFromUuid(uuid: Array[Byte]): MetricKey = {
@@ -92,16 +87,24 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
}
@Override
+ def getDefinitions: List[MetricSourceDefinition] = {
+ metricSourceDefinitionMap.values.toList
+ }
+
+ @Override
def getDefinitionByName(name: String): MetricSourceDefinition = {
if (!metricSourceDefinitionMap.contains(name)) {
LOG.warn("Metric Source Definition with name " + name + " not found")
+ null
+ } else {
+ metricSourceDefinitionMap.apply(name)
}
- metricSourceDefinitionMap.apply(name)
}
@Override
def addDefinition(definition: MetricSourceDefinition): Boolean = {
if (metricSourceDefinitionMap.contains(definition.definitionName)) {
+ LOG.info("Definition with name " + definition.definitionName + " already present.")
return false
}
definition.definitionSource = MetricSourceDefinitionType.API
@@ -109,6 +112,10 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
if (success) {
metricSourceDefinitionMap += definition.definitionName -> definition
+ val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap(definition) = keys
+ metricKeys.++=(keys)
+ LOG.info("Successfully created metric source definition : " + definition.definitionName)
}
success
}
@@ -116,16 +123,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
@Override
def updateDefinition(definition: MetricSourceDefinition): Boolean = {
if (!metricSourceDefinitionMap.contains(definition.definitionName)) {
+ LOG.warn("Metric Source Definition with name " + definition.definitionName + " not found")
return false
}
if (metricSourceDefinitionMap.apply(definition.definitionName).definitionSource != MetricSourceDefinitionType.API) {
return false
}
+ definition.definitionSource = MetricSourceDefinitionType.API
val success: Boolean = adMetadataStoreAccessor.saveInputDefinition(definition)
if (success) {
metricSourceDefinitionMap += definition.definitionName -> definition
+ val keys: Set[MetricKey] = metricMetadataProvider.getMetricKeysForDefinitions(definition)
+ metricDefinitionMetricKeyMap(definition) = keys
+ metricKeys.++=(keys)
+ LOG.info("Successfully updated metric source definition : " + definition.definitionName)
}
success
}
@@ -133,17 +146,22 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
@Override
def deleteDefinitionByName(name: String): Boolean = {
if (!metricSourceDefinitionMap.contains(name)) {
+ LOG.warn("Metric Source Definition with name " + name + " not found")
return false
}
val definition : MetricSourceDefinition = metricSourceDefinitionMap.apply(name)
if (definition.definitionSource != MetricSourceDefinitionType.API) {
+ LOG.warn("Cannot delete metric source definition which was not created through API.")
return false
}
val success: Boolean = adMetadataStoreAccessor.removeInputDefinition(name)
if (success) {
- metricSourceDefinitionMap += definition.definitionName -> definition
+ metricSourceDefinitionMap -= definition.definitionName
+ metricKeys.--=(metricDefinitionMetricKeyMap.apply(definition))
+ metricDefinitionMetricKeyMap -= definition
+ LOG.info("Successfully deleted metric source definition : " + name)
}
success
}
@@ -183,7 +201,6 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
this.adMetadataStoreAccessor = adMetadataStoreAccessor
}
-
/**
* Look into the Metric Definitions inside a Metric Source definition, and push down source level appId &
* hosts to Metric definition if they do not have an override.
@@ -202,7 +219,7 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
}
}
- if (metricDef.isValid && metricDef.hosts.isEmpty) {
+ if (metricDef.isValid && (metricDef.hosts == null || metricDef.hosts.isEmpty)) {
if (sourceLevelHostList != null && sourceLevelHostList.nonEmpty) {
metricDef.hosts = sourceLevelHostList
}
@@ -210,4 +227,16 @@ class MetricDefinitionServiceImpl extends MetricDefinitionService {
}
}
+ /**
+ * Return the mapping between definition name to set of metric keys.
+ *
+ * @return Map of Metric Source Definition to set of metric keys associated with it.
+ */
+ override def getMetricKeys: Map[String, Set[MetricKey]] = {
+ val metricKeyMap: scala.collection.mutable.Map[String, Set[MetricKey]] = scala.collection.mutable.Map()
+ for (definition <- metricSourceDefinitionMap.values) {
+ metricKeyMap(definition.definitionName) = metricDefinitionMetricKeyMap.apply(definition)
+ }
+ metricKeyMap.toMap
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
index afad617..65c496e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricKey.scala
@@ -18,6 +18,9 @@
package org.apache.ambari.metrics.adservice.metadata
+import javax.xml.bind.annotation.XmlRootElement
+
+@XmlRootElement
case class MetricKey (metricName: String, appId: String, instanceId: String, hostname: String, uuid: Array[Byte]) {
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
index 5f9c0a0..b5ba15e 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/metadata/MetricMetadataProvider.scala
@@ -27,5 +27,5 @@ trait MetricMetadataProvider {
* @param metricSourceDefinition component definition
* @return
*/
- def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): (Map[MetricDefinition, Set[MetricKey]], Set[MetricKey])
+ def getMetricKeysForDefinitions(metricSourceDefinition: MetricSourceDefinition): Set[MetricKey]
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
new file mode 100644
index 0000000..248a380
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/MetricAnomalyInstance.scala
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.model
+
+import javax.xml.bind.annotation.XmlRootElement
+
+import org.apache.ambari.metrics.adservice.metadata.MetricKey
+import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
+
+@XmlRootElement
+abstract class MetricAnomalyInstance {
+
+ val metricKey: MetricKey
+ val anomalyType: AnomalyType
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/ba9be802/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
deleted file mode 100644
index 981a893..0000000
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/model/SingleMetricAnomalyInstance.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.metrics.adservice.model
-
-import org.apache.ambari.metrics.adservice.metadata.MetricKey
-import org.apache.ambari.metrics.adservice.model.AnomalyType.AnomalyType
-
-abstract class SingleMetricAnomalyInstance {
-
- val metricKey: MetricKey
- val anomalyType: AnomalyType
-
-}