You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by sw...@apache.org on 2017/11/03 20:44:52 UTC
ambari git commit: AMBARI-22365. Add storage support for storing
metric definitions using LevelDB. (swagle)
Repository: ambari
Updated Branches:
refs/heads/branch-3.0-ams 9431d568f -> dfa64cb99
AMBARI-22365. Add storage support for storing metric definitions using LevelDB. (swagle)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/dfa64cb9
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/dfa64cb9
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/dfa64cb9
Branch: refs/heads/branch-3.0-ams
Commit: dfa64cb996f15720289d38ed1efa75947ecc5768
Parents: 9431d56
Author: Siddharth Wagle <sw...@hortonworks.com>
Authored: Fri Nov 3 10:06:12 2017 -0700
Committer: Siddharth Wagle <sw...@hortonworks.com>
Committed: Fri Nov 3 10:06:12 2017 -0700
----------------------------------------------------------------------
.../pom.xml | 19 +++-
.../src/main/resources/config.yml | 8 ++
.../app/AnomalyDetectionAppConfig.scala | 11 +-
.../app/AnomalyDetectionAppModule.scala | 4 +-
.../MetricDefinitionDBConfiguration.scala | 38 +++++++
.../adservice/db/MetadataDatasource.scala | 73 +++++++++++++
.../adservice/leveldb/LevelDBDatasource.scala | 102 +++++++++++++++++++
.../leveldb/LevelDBDataSourceTest.scala | 57 +++++++++++
8 files changed, 306 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
index 44bdc1f..cfa8124 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/pom.xml
@@ -424,11 +424,18 @@
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>
+
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson.version}</version>
+ <groupId>org.fusesource.leveldbjni</groupId>
+ <artifactId>leveldbjni-all</artifactId>
+ <version>1.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.iq80.leveldb</groupId>
+ <artifactId>leveldb</artifactId>
+ <version>0.9</version>
</dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -452,5 +459,11 @@
<version>2.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.8.4</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
index 920c50c..299a472 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/resources/config.yml
@@ -30,6 +30,14 @@ metricsCollector:
adQueryService:
anomalyDataTtl: 604800
+metricDefinitionDB:
+ # force checksum verification of all data that is read from the file system on behalf of a particular read
+ verifyChecksums: true
+ # raise an error as soon as it detects an internal corruption
+ performParanoidChecks: false
+ # Path to Level DB directory
+ dbDirPath: /var/lib/ambari-metrics-anomaly-detection/
+
#subsystemService:
# spark:
# pointInTime:
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
index c1ef0d1..aa20223 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppConfig.scala
@@ -20,10 +20,9 @@ package org.apache.ambari.metrics.adservice.app
import javax.validation.Valid
-import org.apache.ambari.metrics.adservice.configuration.{AdServiceConfiguration, HBaseConfiguration, MetricCollectorConfiguration, MetricDefinitionServiceConfiguration}
+import org.apache.ambari.metrics.adservice.configuration._
import com.fasterxml.jackson.annotation.JsonProperty
-
import io.dropwizard.Configuration
/**
@@ -46,6 +45,12 @@ class AnomalyDetectionAppConfig extends Configuration {
@Valid
private val adServiceConfiguration = new AdServiceConfiguration
+ /**
+ * LevelDB settings for metrics definitions
+ */
+ @Valid
+ private val metricDefinitionDBConfiguration = new MetricDefinitionDBConfiguration
+
/*
HBase Conf
*/
@@ -66,4 +71,6 @@ class AnomalyDetectionAppConfig extends Configuration {
@JsonProperty("metricsCollector")
def getMetricCollectorConfiguration: MetricCollectorConfiguration = metricCollectorConfiguration
+ @JsonProperty("metricDefinitionDB")
+ def getMetricDefinitionDBConfiguration: MetricDefinitionDBConfiguration = metricDefinitionDBConfiguration
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
index 7425a7e..28b2880 100644
--- a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/app/AnomalyDetectionAppModule.scala
@@ -17,13 +17,14 @@
*/
package org.apache.ambari.metrics.adservice.app
+import org.apache.ambari.metrics.adservice.db.MetadataDatasource
+import org.apache.ambari.metrics.adservice.leveldb.LevelDBDataSource
import org.apache.ambari.metrics.adservice.resource.{AnomalyResource, RootResource}
import org.apache.ambari.metrics.adservice.service.{ADQueryService, ADQueryServiceImpl}
import com.codahale.metrics.health.HealthCheck
import com.google.inject.AbstractModule
import com.google.inject.multibindings.Multibinder
-
import io.dropwizard.setup.Environment
class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environment) extends AbstractModule {
@@ -35,5 +36,6 @@ class AnomalyDetectionAppModule(config: AnomalyDetectionAppConfig, env: Environm
bind(classOf[AnomalyResource])
bind(classOf[RootResource])
bind(classOf[ADQueryService]).to(classOf[ADQueryServiceImpl])
+ bind(classOf[MetadataDatasource]).to(classOf[LevelDBDataSource])
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
new file mode 100644
index 0000000..79a350c
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/configuration/MetricDefinitionDBConfiguration.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.configuration
+
+import javax.validation.constraints.NotNull
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+class MetricDefinitionDBConfiguration {
+
+ @NotNull
+ private var dbDirPath: String = _
+
+ @JsonProperty("verifyChecksums")
+ def verifyChecksums: Boolean = true
+
+ @JsonProperty("performParanoidChecks")
+ def performParanoidChecks: Boolean = false
+
+ @JsonProperty("dbDirPath")
+ def getDbDirPath: String = dbDirPath
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
new file mode 100644
index 0000000..aa6694a
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/db/MetadataDatasource.scala
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.db
+
+trait MetadataDatasource {
+
+ type Key = Array[Byte]
+ type Value = Array[Byte]
+
+ /**
+ * Idempotent call at the start of the application to initialize db
+ */
+ def initialize(): Unit
+
+ /**
+ * This function obtains the associated value to a key. It requires the (key-value) pair to be in the DataSource
+ *
+ * @param key
+ * @return the value associated with the passed key.
+ */
+ def apply(key: Key): Value = get(key).get
+
+ /**
+ * This function obtains the associated value to a key, if there exists one.
+ *
+ * @param key
+ * @return the value associated with the passed key.
+ */
+ def get(key: Key): Option[Value]
+
+
+ /**
+ * This function associates a key to a value, overwriting if necessary
+ */
+ def put(key: Key, value: Value): Unit
+
+ /**
+ * Delete key from the db
+ */
+ def delete(key: Key): Unit
+
+ /**
+ * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
+ *
+ * @param toRemove which includes all the keys to be removed from the DataSource.
+ * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
+ * If a key is already in the DataSource its value will be updated.
+ * @return the new DataSource after the removals and insertions were done.
+ */
+ def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit
+
+ /**
+ * This function closes the DataSource, without deleting the files used by it.
+ */
+ def close(): Unit
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
new file mode 100644
index 0000000..6d185bf
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/main/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDatasource.scala
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.metrics.adservice.leveldb
+
+import java.io.File
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
+import org.apache.ambari.metrics.adservice.db.MetadataDatasource
+import org.iq80.leveldb.{DB, Options, WriteOptions}
+import org.iq80.leveldb.impl.Iq80DBFactory
+
+import com.google.inject.Singleton
+
+@Singleton
+class LevelDBDataSource(appConfig: AnomalyDetectionAppConfig) extends MetadataDatasource {
+
+ private var db: DB = _
+ @volatile var isInitialized: Boolean = false
+
+ override def initialize(): Unit = {
+ if (isInitialized) return
+
+ val configuration: MetricDefinitionDBConfiguration = appConfig.getMetricDefinitionDBConfiguration
+
+ db = createDB(new LevelDbConfig {
+ override val createIfMissing: Boolean = true
+ override val verifyChecksums: Boolean = configuration.verifyChecksums
+ override val paranoidChecks: Boolean = configuration.performParanoidChecks
+ override val path: String = configuration.getDbDirPath
+ })
+ isInitialized = true
+ }
+
+ private def createDB(levelDbConfig: LevelDbConfig): DB = {
+ import levelDbConfig._
+
+ val options = new Options()
+ .createIfMissing(createIfMissing)
+ .paranoidChecks(paranoidChecks) // raise an error as soon as it detects an internal corruption
+ .verifyChecksums(verifyChecksums) // force checksum verification of all data that is read from the file system on behalf of a particular read
+
+ Iq80DBFactory.factory.open(new File(path), options)
+ }
+
+ override def close(): Unit = {
+ db.close()
+ }
+
+ /**
+ * This function obtains the associated value to a key, if there exists one.
+ *
+ * @param key
+ * @return the value associated with the passed key.
+ */
+ override def get(key: Key): Option[Value] = Option(db.get(key))
+
+ /**
+ * This function updates the DataSource by deleting, updating and inserting new (key-value) pairs.
+ *
+ * @param toRemove which includes all the keys to be removed from the DataSource.
+ * @param toUpsert which includes all the (key-value) pairs to be inserted into the DataSource.
+ * If a key is already in the DataSource its value will be updated.
+ */
+ override def update(toRemove: Seq[Key], toUpsert: Seq[(Key, Value)]): Unit = {
+ val batch = db.createWriteBatch()
+ toRemove.foreach { key => batch.delete(key) }
+ toUpsert.foreach { item => batch.put(item._1, item._2) }
+ db.write(batch, new WriteOptions())
+ }
+
+ override def put(key: Key, value: Value): Unit = {
+ db.put(key, value)
+ }
+
+ override def delete(key: Key): Unit = {
+ db.delete(key)
+ }
+}
+
+trait LevelDbConfig {
+ val createIfMissing: Boolean
+ val paranoidChecks: Boolean
+ val verifyChecksums: Boolean
+ val path: String
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/dfa64cb9/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
new file mode 100644
index 0000000..2ddb7b8
--- /dev/null
+++ b/ambari-metrics/ambari-metrics-anomaly-detection-service/src/test/scala/org/apache/ambari/metrics/adservice/leveldb/LevelDBDataSourceTest.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.metrics.adservice.leveldb
+
+import java.io.File
+
+import org.apache.ambari.metrics.adservice.app.AnomalyDetectionAppConfig
+import org.apache.ambari.metrics.adservice.configuration.MetricDefinitionDBConfiguration
+import org.iq80.leveldb.util.FileUtils
+import org.mockito.Mockito.when
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+import org.scalatest.mockito.MockitoSugar
+
+class LevelDBDataSourceTest extends FunSuite with BeforeAndAfter with Matchers with MockitoSugar {
+
+ var db: LevelDBDataSource = _
+ var file : File = FileUtils.createTempDir("adservice-leveldb-test")
+
+ before {
+ val appConfig: AnomalyDetectionAppConfig = mock[AnomalyDetectionAppConfig]
+ val mdConfig : MetricDefinitionDBConfiguration = mock[MetricDefinitionDBConfiguration]
+
+ when(appConfig.getMetricDefinitionDBConfiguration).thenReturn(mdConfig)
+ when(mdConfig.verifyChecksums).thenReturn(true)
+ when(mdConfig.performParanoidChecks).thenReturn(false)
+ when(mdConfig.getDbDirPath).thenReturn(file.getAbsolutePath)
+
+ db = new LevelDBDataSource(appConfig)
+ db.initialize()
+ }
+
+ test("testOperations") {
+ db.put("Hello".getBytes(), "World".getBytes())
+ assert(db.get("Hello".getBytes()).get.sameElements("World".getBytes()))
+ db.update(Seq("Hello".getBytes()), Seq(("Hello".getBytes(), "Mars".getBytes())))
+ assert(db.get("Hello".getBytes()).get.sameElements("Mars".getBytes()))
+ }
+
+ after {
+ FileUtils.deleteRecursively(file)
+ }
+}