You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/10/02 01:18:13 UTC

[GitHub] [samza] prateekm commented on a change in pull request #1161: Transactional State [2/5]: Added a ChangelogSSPIterator API

prateekm commented on a change in pull request #1161: Transactional State [2/5]: Added a ChangelogSSPIterator API
URL: https://github.com/apache/samza/pull/1161#discussion_r330339981
 
 

 ##########
 File path: samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 ##########
 @@ -109,42 +105,89 @@ class KeyValueStorageEngine[K, V](
   }
 
   /**
-   * Restore the contents of this key/value store from the change log,
-   * batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency.
+   * Restore the contents of this key/value store from the change log, batching updates to underlying raw store
+   * for efficiency.
+   *
+   * With transactional state disabled, iterator mode will always be 'restore'. With transactional state enabled,
+   * iterator mode may switch from 'restore' to 'trim' at some point, but will not switch back to 'restore'.
    */
-  def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
+  def restore(iterator: ChangelogSSPIterator) {
     info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString)
+    var restoredMessages = 0
+    var restoredBytes = 0
+    var trimmedMessages = 0
+    var trimmedBytes = 0
 
     val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
+    var lastBatchFlushed = false
 
-    for (envelope <- envelopes.asScala) {
+    while(iterator.hasNext) {
+      val envelope = iterator.next()
       val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
       val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
-
-      batch.add(new Entry(keyBytes, valBytes))
-
-      if (batch.size >= batchSize) {
-        doPutAll(rawStore, batch)
-        batch.clear()
-      }
-
-      if (valBytes != null) {
-        metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length)
+      val mode = iterator.getMode
+
+      if (mode.equals(ChangelogSSPIterator.Mode.RESTORE)) {
+        batch.add(new Entry(keyBytes, valBytes))
+
+        if (batch.size >= batchSize) {
+          doPutAll(rawStore, batch)
+          batch.clear()
+        }
+
+        // update metrics
+        restoredMessages += 1
+        restoredBytes += keyBytes.length
+        if (valBytes != null) restoredBytes += valBytes.length
+        metrics.restoredMessagesGauge.set(restoredMessages)
+        metrics.restoredBytesGauge.set(restoredBytes)
+
+        // log progress every million messages
+        if (restoredMessages % 1000000 == 0) {
+          info(restoredMessages + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
+        }
+      } else {
+        // first write any open restore batches to store
+        if (!lastBatchFlushed) {
+          info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
+          if (batch.size > 0) {
+            doPutAll(rawStore, batch)
+          }
+          lastBatchFlushed = true
+        }
+
+        // then overwrite the value to be trimmed with its current store value
+        val currentValBytes = rawStore.get(keyBytes)
+        val changelogMessage = new OutgoingMessageEnvelope(
+          changelogSSP.getSystemStream, changelogSSP.getPartition, keyBytes, currentValBytes)
+        changelogCollector.send(changelogMessage)
+
+        // update metrics
+        trimmedMessages += 1
+        trimmedBytes += keyBytes.length
+        if (currentValBytes != null) trimmedBytes += currentValBytes.length
+        metrics.trimmedMessagesGauge.set(trimmedMessages)
+        metrics.trimmedBytesGauge.set(trimmedBytes)
+
+        // log progress every hundred thousand messages
+        if (trimmedMessages % 100000 == 0) {
+          info(restoredMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + "...")
+        }
       }
-      metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length)
-      metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1)
-      count += 1
+    }
 
-      if (count % 1000000 == 0) {
-        info(count + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
+    // if the last batch isn't flushed yet (e.g., for non transactional state or no messages to trim), flush it now
+    if (!lastBatchFlushed) {
+      info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
+      if (batch.size > 0) {
+        doPutAll(rawStore, batch)
       }
+      lastBatchFlushed = true
     }
+    info(restoredMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + ".")
 
-    info(count + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
-
-    if (batch.size > 0) {
-      doPutAll(rawStore, batch)
-    }
+    // flush the store and the changelog producer
 
 Review comment:
   We do a task commit after the restore is complete and before we start processing. That will flush all producers (incl. changelog producer). So not a blocker. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services