You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/10/11 18:56:07 UTC

samza git commit: SAMZA-1900: Add restore logging info

Repository: samza
Updated Branches:
  refs/heads/master da88096a0 -> 3c78e06ac


SAMZA-1900: Add restore logging info

prateekm let me know if there are other places that logging should also be improved in this patch.

Author: Daniel Chen <dc...@linkedin.com>
Author: Daniel Chen <xr...@uwaterloo.ca>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #653 from dxichen/add-restore-logging-info


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3c78e06a
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3c78e06a
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3c78e06a

Branch: refs/heads/master
Commit: 3c78e06ac0913dd35866a9e82d8cd7d9d2cc758a
Parents: da88096
Author: Daniel Chen <dc...@linkedin.com>
Authored: Thu Oct 11 11:56:05 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Thu Oct 11 11:56:05 2018 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/storage/StorageEngine.java |  4 ++--
 .../org/apache/samza/storage/TaskStorageManager.scala     |  2 +-
 .../apache/samza/storage/kv/RocksDbKeyValueStore.scala    |  2 +-
 .../org/apache/samza/storage/kv/AccessLoggedStore.scala   |  6 +++---
 .../storage/kv/BaseKeyValueStorageEngineFactory.scala     |  2 +-
 .../apache/samza/storage/kv/KeyValueStorageEngine.scala   | 10 +++++++---
 .../samza/storage/kv/TestKeyValueStorageEngine.scala      |  6 +++++-
 7 files changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index a83f8b3..4e6950a 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -25,7 +25,7 @@ import org.apache.samza.system.IncomingMessageEnvelope;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
- * 
+ *
  * <p>
  * This interface does not specify any query capabilities, which, of course,
  * would be query engine specific. Instead it just specifies the minimum
@@ -39,7 +39,7 @@ public interface StorageEngine {
    * Restore the content of this StorageEngine from the changelog. Messages are
    * provided in one {@link java.util.Iterator} and not deserialized for
    * efficiency, allowing the implementation to optimize replay, if possible.
-   * 
+   *
    * @param envelopes
    *          An iterator of envelopes that the storage engine can read from to
    *          restore its state on startup.

http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index 90fdc19..deb69e1 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -203,7 +203,7 @@ class TaskStorageManager(
   }
 
   private def restoreStores() {
-    debug("Restoring stores.")
+    debug("Restoring stores for task: %s." format taskName.getTaskName)
 
     for ((storeName, store) <- taskStoresToRestore) {
       if (changeLogSystemStreams.contains(storeName)) {

http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 836dab4..b7baede 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -68,7 +68,7 @@ object RocksDbKeyValueStore extends Logging {
     try {
       val rocksDb =
         if (useTTL) {
-          info("Opening RocksDB store with TTL value: %s" format ttl)
+          info("Opening RocksDB store: %s in path: %s with TTL value: %s" format (storeName, dir.toString, ttl))
           TtlDB.open(options, dir.toString, ttl.toInt, false)
         } else {
           RocksDB.open(options, dir.toString)

http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index 39136db..78a7b0b 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -100,16 +100,16 @@ class AccessLoggedStore[K, V](
   }
 
   def close(): Unit = {
-    trace("Closing accessLogged store.")
+    trace("Closing accessLogged store: %s." format storeName)
 
     store.close
   }
 
   def flush(): Unit = {
-    trace("Flushing store.")
+    trace("Flushing store: %s." format storeName)
 
     store.flush
-    trace("Flushed store.")
+    trace("Flushed store: %s." format storeName)
   }
 
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index d962e93..e1e7642 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -154,7 +154,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       }
     }
 
-    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore,
+    new KeyValueStorageEngine(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
       keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 963dce4..0434199 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.storage.kv
 
+import java.io.File
+
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StorageEngine, StoreProperties}
 import org.apache.samza.system.IncomingMessageEnvelope
@@ -32,6 +34,8 @@ import scala.collection.JavaConverters._
  * This implements both the key/value interface and the storage engine interface.
  */
 class KeyValueStorageEngine[K, V](
+  storeName: String,
+  storeDir: File,
   storeProperties: StoreProperties,
   wrapperStore: KeyValueStore[K, V],
   rawStore: KeyValueStore[Array[Byte], Array[Byte]],
@@ -104,7 +108,7 @@ class KeyValueStorageEngine[K, V](
    * batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency.
    */
   def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
-    info("Restoring entries for store " + metrics.storeName)
+    info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString)
 
     val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
 
@@ -132,11 +136,11 @@ class KeyValueStorageEngine[K, V](
       count += 1
 
       if (count % 1000000 == 0) {
-        info(count + " entries restored...")
+        info(count + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
       }
     }
 
-    info(count + " total entries restored.")
+    info(count + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
 
     if (batch.size > 0) {
       doPutAll(rawStore, batch)

http://git-wip-us.apache.org/repos/asf/samza/blob/3c78e06a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index f0c254f..8806a81 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -19,9 +19,11 @@
 
 package org.apache.samza.storage.kv
 
+import java.io.File
 import java.util.Arrays
 
 import org.apache.samza.Partition
+import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
 import org.junit.Assert._
@@ -37,9 +39,11 @@ class TestKeyValueStorageEngine {
   def setup() {
     val wrapperKv = new MockKeyValueStore()
     val rawKv = mock(classOf[KeyValueStore[Array[Byte], Array[Byte]]])
+    val storeName = "test-storeName"
+    val storeDir = mock(classOf[File])
     val properties = mock(classOf[StoreProperties])
     metrics = new KeyValueStorageEngineMetrics
-    engine = new KeyValueStorageEngine[String, String](properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
+    engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
   }
 
   @After