You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/11 18:56:07 UTC
samza git commit: SAMZA-1900: Add restore logging info
Repository: samza
Updated Branches:
refs/heads/master da88096a0 -> 3c78e06ac
SAMZA-1900: Add restore logging info
prateekm let me know if there are other places that logging should also be improved in this patch.
Author: Daniel Chen <dc...@linkedin.com>
Author: Daniel Chen <xr...@uwaterloo.ca>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #653 from dxichen/add-restore-logging-info
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3c78e06a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3c78e06a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3c78e06a
Branch: refs/heads/master
Commit: 3c78e06ac0913dd35866a9e82d8cd7d9d2cc758a
Parents: da88096
Author: Daniel Chen <dc...@linkedin.com>
Authored: Thu Oct 11 11:56:05 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 11 11:56:05 2018 -0700
----------------------------------------------------------------------
.../main/java/org/apache/samza/storage/StorageEngine.java | 4 ++--
.../org/apache/samza/storage/TaskStorageManager.scala | 2 +-
.../apache/samza/storage/kv/RocksDbKeyValueStore.scala | 2 +-
.../org/apache/samza/storage/kv/AccessLoggedStore.scala | 6 +++---
.../storage/kv/BaseKeyValueStorageEngineFactory.scala | 2 +-
.../apache/samza/storage/kv/KeyValueStorageEngine.scala | 10 +++++++---
.../samza/storage/kv/TestKeyValueStorageEngine.scala | 6 +++++-
7 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index a83f8b3..4e6950a 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -25,7 +25,7 @@ import org.apache.samza.system.IncomingMessageEnvelope;
/**
* A storage engine for managing state maintained by a stream processor.
- *
+ *
* <p>
* This interface does not specify any query capabilities, which, of course,
* would be query engine specific. Instead it just specifies the minimum
@@ -39,7 +39,7 @@ public interface StorageEngine {
* Restore the content of this StorageEngine from the changelog. Messages are
* provided in one {@link java.util.Iterator} and not deserialized for
* efficiency, allowing the implementation to optimize replay, if possible.
- *
+ *
* @param envelopes
* An iterator of envelopes that the storage engine can read from to
* restore its state on startup.
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 90fdc19..deb69e1 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -203,7 +203,7 @@ class TaskStorageManager(
}
private def restoreStores() {
- debug("Restoring stores.")
+ debug("Restoring stores for task: %s." format taskName.getTaskName)
for ((storeName, store) <- taskStoresToRestore) {
if (changeLogSystemStreams.contains(storeName)) {
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 836dab4..b7baede 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -68,7 +68,7 @@ object RocksDbKeyValueStore extends Logging {
try {
val rocksDb =
if (useTTL) {
- info("Opening RocksDB store with TTL value: %s" format ttl)
+ info("Opening RocksDB store: %s in path: %s with TTL value: %s" format (storeName, dir.toString, ttl))
TtlDB.open(options, dir.toString, ttl.toInt, false)
} else {
RocksDB.open(options, dir.toString)
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index 39136db..78a7b0b 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -100,16 +100,16 @@ class AccessLoggedStore[K, V](
}
def close(): Unit = {
- trace("Closing accessLogged store.")
+ trace("Closing accessLogged store: %s." format storeName)
store.close
}
def flush(): Unit = {
- trace("Flushing store.")
+ trace("Flushing store: %s." format storeName)
store.flush
- trace("Flushed store.")
+ trace("Flushed store: %s." format storeName)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index d962e93..e1e7642 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -154,7 +154,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
}
}
- new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore,
+ new KeyValueStorageEngine(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 963dce4..0434199 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -19,6 +19,8 @@
package org.apache.samza.storage.kv
+import java.io.File
+
import org.apache.samza.util.Logging
import org.apache.samza.storage.{StorageEngine, StoreProperties}
import org.apache.samza.system.IncomingMessageEnvelope
@@ -32,6 +34,8 @@ import scala.collection.JavaConverters._
* This implements both the key/value interface and the storage engine interface.
*/
class KeyValueStorageEngine[K, V](
+ storeName: String,
+ storeDir: File,
storeProperties: StoreProperties,
wrapperStore: KeyValueStore[K, V],
rawStore: KeyValueStore[Array[Byte], Array[Byte]],
@@ -104,7 +108,7 @@ class KeyValueStorageEngine[K, V](
* batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency.
*/
def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
- info("Restoring entries for store " + metrics.storeName)
+ info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString)
val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
@@ -132,11 +136,11 @@ class KeyValueStorageEngine[K, V](
count += 1
if (count % 1000000 == 0) {
- info(count + " entries restored...")
+ info(count + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
}
}
- info(count + " total entries restored.")
+ info(count + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
if (batch.size > 0) {
doPutAll(rawStore, batch)
http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index f0c254f..8806a81 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -19,9 +19,11 @@
package org.apache.samza.storage.kv
+import java.io.File
import java.util.Arrays
import org.apache.samza.Partition
+import org.apache.samza.container.TaskName
import org.apache.samza.storage.StoreProperties
import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
import org.junit.Assert._
@@ -37,9 +39,11 @@ class TestKeyValueStorageEngine {
def setup() {
val wrapperKv = new MockKeyValueStore()
val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
+ val storeName = "test-storeName"
+ val storeDir = mock(classOf[File])
val properties = mock(classOf[StoreProperties])
metrics = new KeyValueStorageEngineMetrics
- engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
+ engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
}
@After