You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/02 01:56:19 UTC

[samza] branch master updated: Transactional State [2/5]: Added a ChangelogSSPIterator API

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e5ede6  Transactional State [2/5]: Added a ChangelogSSPIterator API
3e5ede6 is described below

commit 3e5ede6047d7a2a32ac37ff937a08d7b927814c4
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 1 18:56:14 2019 -0700

    Transactional State [2/5]: Added a ChangelogSSPIterator API
    
    This PR changes the KeyValueStorageEngine restore API and implementation to take a new ChangelogSSPIterator instead of a plain Iterator.
    
    The new ChangelogSSPIterator is similar to the existing SystemStreamPartitionIterator with the following differences:
    
    This new iterator has 2 modes: Restore and Trim.
    It takes an endingOffset during construction. The mode changes from Restore to Trim during iteration if the current message offset is greater than the ending offset (and trim mode is enabled).
    Does not check for end of stream since it isn't applicable to changelog topics.
    For supporting transactional state, we only restore changelog messages up to the changelog SSP offset in the checkpoint topic. Any messages after this 'checkpointed changelog offset' are trimmed by overwriting them with the current store value. When used in conjunction with an appropriate 'min.compaction.lag.ms' configuration for the Kafka changelog topic, this ensures that on container restart any store contents are consistent with the last input checkpoints and do not reflect any ne [...]
---
 .../org/apache/samza/storage/StorageEngine.java    |   5 +-
 .../apache/samza/system/ChangelogSSPIterator.java  | 100 ++++++++++++++++++++
 .../samza/storage/ContainerStorageManager.java     |  10 +-
 .../apache/samza/storage/MockStorageEngine.java    |   8 +-
 .../samza/storage/kv/KeyValueStorageEngine.scala   | 105 +++++++++++++++------
 .../storage/kv/TestKeyValueStorageEngine.scala     |  26 +++--
 6 files changed, 204 insertions(+), 50 deletions(-)

diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 58c6368..804a250 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -19,11 +19,10 @@
 
 package org.apache.samza.storage;
 
-import java.util.Iterator;
 
 import java.nio.file.Path;
 import java.util.Optional;
-import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.ChangelogSSPIterator;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -46,7 +45,7 @@ public interface StorageEngine {
    *          An iterator of envelopes that the storage engine can read from to
    *          restore its state on startup.
    */
-  void restore(Iterator<IncomingMessageEnvelope> envelopes);
+  void restore(ChangelogSSPIterator envelopes);
 
   /**
    * Flush any cached messages
diff --git a/samza-api/src/main/java/org/apache/samza/system/ChangelogSSPIterator.java b/samza-api/src/main/java/org/apache/samza/system/ChangelogSSPIterator.java
new file mode 100644
index 0000000..bf62c8a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/system/ChangelogSSPIterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+
+public class ChangelogSSPIterator {
+  public enum Mode {
+    RESTORE,
+    TRIM
+  }
+
+  private final SystemConsumer systemConsumer;
+  private final String endOffset;
+  private final SystemAdmin admin;
+  private final Set<SystemStreamPartition> fetchSet;
+  private final boolean trimEnabled;
+  private Queue<IncomingMessageEnvelope> peeks;
+  private Mode mode = Mode.RESTORE;
+
+  // endOffset is inclusive when restoring. endOffset == null means trim from staring offset to head.
+  public ChangelogSSPIterator(SystemConsumer systemConsumer,
+      SystemStreamPartition systemStreamPartition, String endOffset, SystemAdmin admin, boolean trimEnabled) {
+    this.systemConsumer = systemConsumer;
+    this.endOffset = endOffset;
+    this.trimEnabled = trimEnabled;
+    if (this.trimEnabled && endOffset == null) {
+      mode = Mode.TRIM;
+    }
+    this.admin = admin;
+    this.fetchSet = new HashSet<>();
+    this.fetchSet.add(systemStreamPartition);
+    this.peeks = new ArrayDeque<>();
+  }
+
+  public boolean hasNext() {
+    refresh();
+
+    return peeks.size() > 0;
+  }
+
+  public IncomingMessageEnvelope next() {
+    refresh();
+
+    if (peeks.size() == 0) {
+      throw new NoSuchElementException();
+    }
+
+    IncomingMessageEnvelope envelope = peeks.poll();
+
+    if (this.trimEnabled && (endOffset == null || admin.offsetComparator(envelope.getOffset(), endOffset) > 0)) {
+      mode = Mode.TRIM;
+    }
+
+    return envelope;
+  }
+
+  public Mode getMode() {
+    return this.mode;
+  }
+
+  private void refresh() {
+    if (peeks.size() == 0) {
+      try {
+        Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = systemConsumer.poll(fetchSet, SystemConsumer.BLOCK_ON_OUTSTANDING_MESSAGES);
+
+        for (List<IncomingMessageEnvelope> systemStreamPartitionEnvelopes : envelopes.values()) {
+          peeks.addAll(systemStreamPartitionEnvelopes);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new SamzaException(e);
+      }
+    }
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index bc6e8df..d4f7424 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -64,6 +64,7 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeManager;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.ChangelogSSPIterator;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.StreamSpec;
@@ -76,7 +77,6 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.system.SystemStreamPartitionIterator;
 import org.apache.samza.system.chooser.DefaultChooser;
 import org.apache.samza.system.chooser.MessageChooser;
 import org.apache.samza.system.chooser.RoundRobinChooserFactory;
@@ -1153,11 +1153,13 @@ public class ContainerStorageManager {
       for (String storeName : taskStoresToRestore) {
         SystemConsumer systemConsumer = storeConsumers.get(storeName);
         SystemStream systemStream = changelogSystemStreams.get(storeName);
+        SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem());
 
-        SystemStreamPartitionIterator systemStreamPartitionIterator = new SystemStreamPartitionIterator(systemConsumer,
-            new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()));
+        // TODO HIGH pmaheshw: use actual changelog topic newest offset instead of trimEnabled flag
+        ChangelogSSPIterator changelogSSPIterator = new ChangelogSSPIterator(systemConsumer,
+            new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), null, systemAdmin, false);
 
-        taskStores.get(storeName).restore(systemStreamPartitionIterator);
+        taskStores.get(storeName).restore(changelogSSPIterator);
       }
     }
 
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index 5d46b43..405abd7 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -23,10 +23,10 @@ import java.io.File;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 
 import java.util.List;
 import java.util.Optional;
+import org.apache.samza.system.ChangelogSSPIterator;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -52,9 +52,9 @@ public class MockStorageEngine implements StorageEngine {
   }
 
   @Override
-  public void restore(Iterator<IncomingMessageEnvelope> envelopes) {
-    while (envelopes.hasNext()) {
-      incomingMessageEnvelopes.add(envelopes.next());
+  public void restore(ChangelogSSPIterator messagesToRestore) {
+    while (messagesToRestore.hasNext()) {
+      incomingMessageEnvelopes.add(messagesToRestore.next());
     }
   }
 
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 4a0116b..7ab5268 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -20,16 +20,14 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
-import java.nio.file.Path
-import java.util.Optional
 
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StorageEngine, StoreProperties}
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.system.{ChangelogSSPIterator, OutgoingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.util.TimerUtil
-
-import scala.collection.JavaConverters._
+import java.nio.file.Path
+import java.util.Optional
 
 /**
  * A key value store.
@@ -48,8 +46,6 @@ class KeyValueStorageEngine[K, V](
   batchSize: Int = 500,
   val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtil with Logging {
 
-  var count = 0
-
   /* delegate to underlying store */
   def get(key: K): V = {
     updateTimer(metrics.getNs) {
@@ -109,42 +105,89 @@ class KeyValueStorageEngine[K, V](
   }
 
   /**
-   * Restore the contents of this key/value store from the change log,
-   * batching updates to underlying raw store to notAValidEvent wrapping functions for efficiency.
+   * Restore the contents of this key/value store from the change log, batching updates to underlying raw store
+   * for efficiency.
+   *
+   * With transactional state disabled, iterator mode will always be 'restore'. With transactional state enabled,
+   * iterator mode may switch from 'restore' to 'trim' at some point, but will not switch back to 'restore'.
    */
-  def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
+  def restore(iterator: ChangelogSSPIterator) {
     info("Restoring entries for store: " + storeName + " in directory: " + storeDir.toString)
+    var restoredMessages = 0
+    var restoredBytes = 0
+    var trimmedMessages = 0
+    var trimmedBytes = 0
 
     val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
+    var lastBatchFlushed = false
 
-    for (envelope <- envelopes.asScala) {
+    while(iterator.hasNext) {
+      val envelope = iterator.next()
       val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
       val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
-
-      batch.add(new Entry(keyBytes, valBytes))
-
-      if (batch.size >= batchSize) {
-        doPutAll(rawStore, batch)
-        batch.clear()
-      }
-
-      if (valBytes != null) {
-        metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length)
+      val mode = iterator.getMode
+
+      if (mode.equals(ChangelogSSPIterator.Mode.RESTORE)) {
+        batch.add(new Entry(keyBytes, valBytes))
+
+        if (batch.size >= batchSize) {
+          doPutAll(rawStore, batch)
+          batch.clear()
+        }
+
+        // update metrics
+        restoredMessages += 1
+        restoredBytes += keyBytes.length
+        if (valBytes != null) restoredBytes += valBytes.length
+        metrics.restoredMessagesGauge.set(restoredMessages)
+        metrics.restoredBytesGauge.set(restoredBytes)
+
+        // log progress every million messages
+        if (restoredMessages % 1000000 == 0) {
+          info(restoredMessages + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
+        }
+      } else {
+        // first write any open restore batches to store
+        if (!lastBatchFlushed) {
+          info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
+          if (batch.size > 0) {
+            doPutAll(rawStore, batch)
+          }
+          lastBatchFlushed = true
+        }
+
+        // then overwrite the value to be trimmed with its current store value
+        val currentValBytes = rawStore.get(keyBytes)
+        val changelogMessage = new OutgoingMessageEnvelope(
+          changelogSSP.getSystemStream, changelogSSP.getPartition, keyBytes, currentValBytes)
+        changelogCollector.send(changelogMessage)
+
+        // update metrics
+        trimmedMessages += 1
+        trimmedBytes += keyBytes.length
+        if (currentValBytes != null) trimmedBytes += currentValBytes.length
+        metrics.trimmedMessagesGauge.set(trimmedMessages)
+        metrics.trimmedBytesGauge.set(trimmedBytes)
+
+        // log progress every hundred thousand messages
+        if (trimmedMessages % 100000 == 0) {
+          info(restoredMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + "...")
+        }
       }
-      metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length)
-      metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1)
-      count += 1
+    }
 
-      if (count % 1000000 == 0) {
-        info(count + " entries restored for store: " + storeName + " in directory: " + storeDir.toString + "...")
+    // if the last batch isn't flushed yet (e.g., for non transactional state or no messages to trim), flush it now
+    if (!lastBatchFlushed) {
+      info(restoredMessages + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
+      if (batch.size > 0) {
+        doPutAll(rawStore, batch)
       }
+      lastBatchFlushed = true
     }
+    info(restoredMessages + " entries trimmed for store: " + storeName + " in directory: " + storeDir.toString + ".")
 
-    info(count + " total entries restored for store: " + storeName + " in directory: " + storeDir.toString + ".")
-
-    if (batch.size > 0) {
-      doPutAll(rawStore, batch)
-    }
+    // flush the store and the changelog producer
+    flush() // TODO HIGH pmaheshw: Need a way to flush changelog producers. This only flushes the stores.
   }
 
   def flush() = {
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 5f648f6..79d579a 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -23,9 +23,9 @@ import java.io.File
 import java.util.Arrays
 
 import org.apache.samza.Partition
-import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.system.ChangelogSSPIterator.Mode
+import org.apache.samza.system.{ChangelogSSPIterator, IncomingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task.MessageCollector
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -142,12 +142,22 @@ class TestKeyValueStorageEngine {
   @Test
   def testRestoreMetrics(): Unit = {
     val changelogSSP = new SystemStreamPartition("TestSystem", "TestStream", new Partition(0))
-    val changelogEntries = java.util.Arrays asList(
-      new IncomingMessageEnvelope(changelogSSP, "0", Array[Byte](1, 2), Array[Byte](3, 4, 5)),
-      new IncomingMessageEnvelope(changelogSSP, "1", Array[Byte](2, 3), Array[Byte](4, 5, 6)),
-      new IncomingMessageEnvelope(changelogSSP, "2", Array[Byte](3, 4), Array[Byte](5, 6, 7)))
-
-    engine.restore(changelogEntries.iterator())
+    val iterator = mock(classOf[ChangelogSSPIterator])
+    when(iterator.hasNext)
+      .thenReturn(true)
+      .thenReturn(true)
+      .thenReturn(true)
+      .thenReturn(false)
+    when(iterator.next())
+      .thenReturn(new IncomingMessageEnvelope(changelogSSP, "0", Array[Byte](1, 2), Array[Byte](3, 4, 5)))
+      .thenReturn(new IncomingMessageEnvelope(changelogSSP, "1", Array[Byte](2, 3), Array[Byte](4, 5, 6)))
+      .thenReturn(new IncomingMessageEnvelope(changelogSSP, "2", Array[Byte](3, 4), Array[Byte](5, 6, 7)))
+    when(iterator.getMode)
+      .thenReturn(Mode.RESTORE)
+      .thenReturn(Mode.RESTORE)
+      .thenReturn(Mode.RESTORE)
+
+    engine.restore(iterator)
 
     assertEquals(3, metrics.restoredMessagesGauge.getValue)
     assertEquals(15, metrics.restoredBytesGauge.getValue) // 3 keys * 2 bytes/key +  3 msgs * 3 bytes/msg