You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/05/11 19:56:35 UTC
samza git commit: SAMZA-1174; Profiling state store performance
Repository: samza
Updated Branches:
refs/heads/master 46263677d -> 014a59c68
SAMZA-1174; Profiling state store performance
This is s commit for [SAMZA-1174](https://issues.apache.org/jira/browse/SAMZA-1174). This commit involves gathering a log of operations (read, write, delete, etc.) happening on the state and publishing them into a kafka topic. It is names as "Access Log" behaving similar to changelog, but gathering log information.
Author: jmehar2 <jm...@illinois.edu>
Author: Jayasi Mehar <ja...@gmail.com>
Author: s-noghabi <ab...@illinois.edu>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>, Jagadish V <ja...@apache.org>
Closes #132 from s-noghabi/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/014a59c6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/014a59c6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/014a59c6
Branch: refs/heads/master
Commit: 014a59c68b792c8e63639c6f16257c41f0f4e0c6
Parents: 4626367
Author: jmehar2 <jm...@illinois.edu>
Authored: Thu May 11 12:56:31 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Thu May 11 12:56:31 2017 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/StorageConfig.scala | 18 +++
.../samza/coordinator/JobModelManager.scala | 24 +++
.../apache/samza/serializers/SerdeManager.scala | 13 +-
.../samza/storage/kv/AccessLogMessage.scala | 44 ++++++
.../samza/storage/kv/AccessLoggedStore.scala | 155 +++++++++++++++++++
.../kv/BaseKeyValueStorageEngineFactory.scala | 9 +-
6 files changed, 258 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
index 8dbf739..0e3d568 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
@@ -35,6 +35,11 @@ object StorageConfig {
val CHANGELOG_SYSTEM = "job.changelog.system"
val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms"
val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1)
+ val ACCESSLOG_STREAM_SUFFIX = "access-log"
+ val ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio"
+ val ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled"
+ val DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50
+
implicit def Config2Storage(config: Config) = new StorageConfig(config)
}
@@ -45,11 +50,24 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
+ def getAccessLogEnabled(storeName: String) = {
+ getBoolean(ACCESSLOG_ENABLED format storeName, false)
+ }
+
def getChangelogStream(name: String) = {
val javaStorageConfig = new JavaStorageConfig(config)
Option(javaStorageConfig.getChangelogStream(name))
}
+ //Returns the accesslog stream name given a changelog stream name
+ def getAccessLogStream(changeLogStream: String) = {
+ changeLogStream + "-" + ACCESSLOG_STREAM_SUFFIX
+ }
+
+ def getAccessLogSamplingRatio(storeName: String) = {
+ getInt(ACCESSLOG_SAMPLING_RATIO format storeName, DEFAULT_ACCESSLOG_SAMPLING_RATIO)
+ }
+
def getChangeLogDeleteRetentionInMs(storeName: String) = {
getLong(CHANGELOG_DELETE_RETENTION_MS format storeName, DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
index dda0b6b..353e297 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
@@ -48,6 +48,7 @@ import org.apache.samza.system.SystemFactory
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.SystemStreamPartitionMatcher
import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.StreamSpec
import org.apache.samza.util.Logging
import org.apache.samza.util.Util
import org.apache.samza.Partition
@@ -138,6 +139,7 @@ object JobModelManager extends Logging {
changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)
createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
+ createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions)
jobModelManager
}
@@ -298,6 +300,28 @@ object JobModelManager extends Logging {
}
}
+ private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int): Unit = {
+ val changeLogSystemStreams = config
+ .getStoreNames
+ .filter(config.getChangelogStream(_).isDefined)
+ .map(name => (name, config.getChangelogStream(name).get)).toMap
+ .mapValues(Util.getSystemStreamFromNames(_))
+
+ for ((storeName, systemStream) <- changeLogSystemStreams) {
+ val accessLog = config.getAccessLogEnabled(storeName)
+ if (accessLog) {
+ val systemAdmin = Util.getObj[SystemFactory](config
+ .getSystemFactory(systemStream.getSystem)
+ .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
+ ).getAdmin(systemStream.getSystem, config)
+
+ val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream),
+ config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions)
+ systemAdmin.createStream(accessLogSpec)
+ }
+ }
+ }
+
private def getSystemNames(config: Config) = config.getSystemNames.toSet
}
http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
index 066d894..4540bce 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
@@ -24,6 +24,7 @@ import org.apache.samza.config.SerializerConfig
import org.apache.samza.system.SystemStream
import org.apache.samza.system.OutgoingMessageEnvelope
import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.config.StorageConfig
class SerdeManager(
serdes: Map[String, Serde[Object]] = Map(),
@@ -38,7 +39,8 @@ class SerdeManager(
.toBytes(obj)
def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
- val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+ val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)
+ || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getKey
} else if (envelope.getKeySerializerName != null) {
@@ -55,7 +57,8 @@ class SerdeManager(
envelope.getKey
}
- val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+ val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)
+ || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getMessage
} else if (envelope.getMessageSerializerName != null) {
@@ -90,7 +93,8 @@ class SerdeManager(
.fromBytes(bytes)
def fromBytes(envelope: IncomingMessageEnvelope) = {
- val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+ val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)
+ || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getKey
} else if (systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) {
@@ -104,7 +108,8 @@ class SerdeManager(
envelope.getKey
}
- val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+ val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)
+ || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
// If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
envelope.getMessage
} else if (systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) {
http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala
new file mode 100644
index 0000000..dde5599
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.Serializable
+import java.io.ByteArrayOutputStream
+import java.io.ObjectOutputStream
+import java.util.ArrayList
+
+class AccessLogMessage(val DBOperation: Int,
+ val duration: Long,
+ val keys: ArrayList[Array[Byte]],
+ val timestamp: Long = System.currentTimeMillis()
+ ) extends Serializable {
+
+
+ def serialize() : Array[Byte] = {
+ val byteStream = new ByteArrayOutputStream()
+ val outputStream = new ObjectOutputStream(byteStream)
+ outputStream.writeObject(this)
+ outputStream.close
+ val obj: Array[Byte] = byteStream.toByteArray
+ byteStream.close
+ return obj
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
new file mode 100644
index 0000000..c21c9a6
--- /dev/null
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+
+import java.util
+import org.apache.samza.config.StorageConfig
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.util.Logging
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, SystemStreamPartition}
+import org.apache.samza.serializers._
+
+class AccessLoggedStore[K, V](
+ val store: KeyValueStore[K, V],
+ val collector: MessageCollector,
+ val changelogSystemStreamPartition: SystemStreamPartition,
+ val storageConfig: StorageConfig,
+ val storeName: String,
+ val keySerde: Serde[K]) extends KeyValueStore[K, V] with Logging {
+
+ object DBOperation extends Enumeration {
+ type DBOperation = Int
+ val READ = 1
+ val WRITE = 2
+ val DELETE = 3
+ val RANGE = 4
+ }
+
+ val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream)
+ val systemStream = new SystemStream(changelogSystemStreamPartition.getSystemStream.getSystem, streamName)
+ val partitionId: Int = changelogSystemStreamPartition.getPartition.getPartitionId
+ val serializer = new LongSerde()
+ val samplingRatio = storageConfig.getAccessLogSamplingRatio(storeName)
+ val rng = scala.util.Random
+
+ def get(key: K): V = {
+ val list = new util.ArrayList[Array[Byte]]
+ list.add(toBytesOrNull(key))
+ logAccess(DBOperation.READ, list, store.get(key))
+ }
+
+ def getAll(keys: util.List[K]): util.Map[K, V] = {
+ logAccess(DBOperation.READ, serializeKeys(keys), store.getAll(keys))
+ }
+
+ def put(key: K, value: V): Unit = {
+ val list = new util.ArrayList[Array[Byte]]
+ list.add(toBytesOrNull(key))
+ logAccess(DBOperation.WRITE, list, store.put(key, value))
+ }
+
+ def putAll(entries: util.List[Entry[K, V]]): Unit = {
+ logAccess(DBOperation.WRITE, serializeKeysFromEntries(entries), store.putAll(entries))
+ }
+
+ def delete(key: K): Unit = {
+ val list = new util.ArrayList[Array[Byte]]
+ list.add(toBytesOrNull(key))
+ logAccess(DBOperation.DELETE, list, store.delete(key))
+ }
+
+ def deleteAll(keys: util.List[K]): Unit = {
+ logAccess(DBOperation.DELETE, serializeKeys(keys), store.deleteAll(keys))
+ }
+
+ def range(from: K, to: K): KeyValueIterator[K, V] = {
+ val list : util.ArrayList[K] = new util.ArrayList[K]()
+ list.add(from)
+ list.add(to)
+ logAccess(DBOperation.RANGE, serializeKeys(list), store.range(from, to))
+ }
+
+ def all(): KeyValueIterator[K, V] = {
+ store.all()
+ }
+
+ def close(): Unit = {
+ trace("Closing accessLogged store.")
+
+ store.close
+ }
+
+ def flush(): Unit = {
+ trace("Flushing store.")
+
+ store.flush
+ trace("Flushed store.")
+ }
+
+
+ def serializeKeys(keys: util.List[K]): util.ArrayList[Array[Byte]] = {
+ val keysInBytes = new util.ArrayList[Array[Byte]]
+ val iter = keys.iterator
+ if (iter != null)
+ while(iter.hasNext()) {
+ val entry = iter.next()
+ keysInBytes.add(toBytesOrNull(entry))
+ }
+
+ keysInBytes
+ }
+
+ def serializeKeysFromEntries(list: util.List[Entry[K, V]]): util.ArrayList[Array[Byte]] = {
+ val keysInBytes = new util.ArrayList[Array[Byte]]
+ val iter = list.iterator
+ if (iter != null)
+ while(iter.hasNext()) {
+ val entry = iter.next().getKey
+ keysInBytes.add(toBytesOrNull(entry))
+ }
+
+ keysInBytes
+ }
+
+ private def logAccess[R](dBOperation: Int, keys: util.ArrayList[Array[Byte]],
+ block: => R):R = {
+ val startTimeNs = System.nanoTime()
+ val result = block
+ val endTimeNs = System.nanoTime()
+ if (rng.nextInt() < samplingRatio) {
+ val duration = endTimeNs - startTimeNs
+ val timeStamp = System.currentTimeMillis()
+ val message = new AccessLogMessage(dBOperation, duration, keys)
+ collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, serializer.toBytes(timeStamp), message.serialize()))
+ }
+
+ result
+ }
+
+ def toBytesOrNull(key: K): Array[Byte] = {
+ if (key == null) {
+ return null
+ }
+ val bytes = keySerde.toBytes(key)
+ bytes
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/014a59c6/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index 8ffc817..e3a2970 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -82,6 +82,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
val storeFactory = storageConfig.get("factory")
var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder()
+ val accessLog = storageConfig.getBoolean("accesslog.enabled", false)
if (storeFactory == null) {
throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!")
@@ -129,8 +130,14 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
serialized
}
+ val maybeAccessLoggedStore = if (accessLog) {
+ new AccessLoggedStore(maybeCachedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde)
+ } else {
+ maybeCachedStore
+ }
+
// wrap with null value checking
- val nullSafeStore = new NullSafeKeyValueStore(maybeCachedStore)
+ val nullSafeStore = new NullSafeKeyValueStore(maybeAccessLoggedStore)
// create the storage engine and return
// TODO: Decide if we should use raw bytes when restoring