You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/02 01:15:25 UTC

[samza] branch master updated: Transactional State [1/5]: Added Store Checkpoint API

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 18755a3  Transactional State [1/5]: Added Store Checkpoint API
18755a3 is described below

commit 18755a38eef35d9b1b730fef6158efefab80832c
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 1 18:15:19 2019 -0700

    Transactional State [1/5]: Added Store Checkpoint API
    
    This PR adds a new 'checkpoint' API to KeyValueStore that allows taking persistent (on-disk) snapshots of the local store state during commits for implementations that support it.
    
    In RocksDB, this creates a new Checkpoint directory.
    
    For supporting transactional state, during a commit, the newest changelog SSP offset is stored in the OFFSET file in this checkpoint directory, as well as in the checkpoint topic. During container start / store restore, the checkpoint directory corresponding to the changelog offset in the checkpoint topic is selected (if any) and used as the basis for restore.
    
    Thanks to @xinyuiscool for contributing this.
---
 build.gradle                                         |  1 +
 gradle/dependency-versions.gradle                    |  1 +
 .../java/org/apache/samza/storage/StorageEngine.java |  7 +++++++
 .../org/apache/samza/storage/kv/KeyValueStore.java   |  8 ++++++++
 .../samza/operators/util/InternalInMemoryStore.java  |  7 +++++++
 .../operators/impl/store/TestInMemoryStore.java      |  7 +++++++
 .../org/apache/samza/storage/MockStorageEngine.java  |  7 +++++++
 .../storage/kv/inmemory/InMemoryKeyValueStore.scala  |  7 +++++++
 .../samza/storage/kv/RocksDbKeyValueStore.scala      | 14 ++++++++++++--
 .../samza/storage/kv/LargeMessageSafeStore.java      |  7 +++++++
 .../apache/samza/storage/kv/AccessLoggedStore.scala  |  7 +++++++
 .../kv/BaseKeyValueStorageEngineFactory.scala        | 19 +++++++++----------
 .../org/apache/samza/storage/kv/CachedStore.scala    |  7 ++++++-
 .../samza/storage/kv/KeyValueStorageEngine.scala     | 20 ++++++++++++++------
 .../storage/kv/KeyValueStorageEngineMetrics.scala    |  6 ++++--
 .../org/apache/samza/storage/kv/LoggedStore.scala    |  7 +++++++
 .../samza/storage/kv/NullSafeKeyValueStore.scala     |  7 +++++++
 .../samza/storage/kv/SerializedKeyValueStore.scala   |  6 ++++++
 .../apache/samza/storage/kv/MockKeyValueStore.scala  |  6 ++++++
 .../samza/storage/kv/TestKeyValueStorageEngine.scala |  6 +++++-
 20 files changed, 135 insertions(+), 22 deletions(-)

diff --git a/build.gradle b/build.gradle
index b7385f9..2c35918 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,6 +184,7 @@ project(":samza-core_$scalaSuffix") {
     compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
     compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
     compile "org.apache.commons:commons-lang3:$commonsLang3Version"
+    compile "commons-io:commons-io:$commonsIoVersion"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile "org.scala-lang:scala-library:$scalaVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 6fe812f..f70e879 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,6 +25,7 @@
   commonsCollectionVersion = "3.2.1"
   commonsHttpClientVersion = "3.1"
   commonsLang3Version = "3.4"
+  commonsIoVersion = "2.6"
   elasticsearchVersion = "2.2.0"
   gsonVersion = "2.8.5"
   guavaVersion = "23.0"
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 4e6950a..58c6368 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -21,6 +21,8 @@ package org.apache.samza.storage;
 
 import java.util.Iterator;
 
+import java.nio.file.Path;
+import java.util.Optional;
 import org.apache.samza.system.IncomingMessageEnvelope;
 
 /**
@@ -52,6 +54,11 @@ public interface StorageEngine {
   void flush();
 
   /**
+   * Checkpoint store snapshots.
+   */
+  Optional<Path> checkpoint(String id);
+
+  /**
    * Close the storage engine
    */
   void stop();
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 67d7fb3..fa9b4de 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -19,9 +19,12 @@
 
 package org.apache.samza.storage.kv;
 
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+
 
 /**
  * A key-value store that supports put, get, delete, and range queries.
@@ -141,4 +144,9 @@ public interface KeyValueStore<K, V> {
    * Flushes this key-value store, if applicable.
    */
   void flush();
+
+  /**
+   * Checkpoint the store snapshot.
+   */
+  Optional<Path> checkpoint(String id);
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 10a92f8..2ad25eb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -24,11 +24,13 @@ import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Implements a {@link KeyValueStore} using an in-memory Java Map.
@@ -136,4 +138,9 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
   public void flush() {
     //not applicable
   }
+
+  @Override
+  public Optional<Path> checkpoint(String id) {
+    return Optional.empty();
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index 9c2306a..c742409 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -25,10 +25,12 @@ import org.apache.samza.storage.kv.KeyValueSnapshot;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 
+import java.nio.file.Path;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
@@ -129,6 +131,11 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
 
   }
 
+  @Override
+  public Optional<Path> checkpoint(String id) {
+    return Optional.empty();
+  }
+
   private static class InMemoryIterator<K, V> implements KeyValueIterator<K, V> {
 
     Iterator<Map.Entry<byte[], byte[]>> wrapped;
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index fda1355..5d46b43 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -20,11 +20,13 @@
 package org.apache.samza.storage;
 
 import java.io.File;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 
 import java.util.List;
+import java.util.Optional;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 
@@ -61,6 +63,11 @@ public class MockStorageEngine implements StorageEngine {
   }
 
   @Override
+  public Optional<Path> checkpoint(String id) {
+    return Optional.empty();
+  }
+
+  @Override
   public void stop() {
   }
 
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 988d1c9..d482c15 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -21,7 +21,9 @@ package org.apache.samza.storage.kv.inmemory
 import com.google.common.primitives.UnsignedBytes
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.kv._
+import java.nio.file.Path
 import java.util
+import java.util.Optional
 
 /**
  * In memory implementation of a key value store.
@@ -124,4 +126,9 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
       override def close() { }
     }
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    // No checkpoint being persisted. State restores from Changelog.
+    Optional.empty()
+  }
 }
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index e4f78d3..a2ae8b0 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,13 +20,16 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
+import java.nio.file.{Path, Paths}
 import java.util
-import java.util.Comparator
+import java.util.{Comparator, Optional}
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import org.apache.samza.SamzaException
+import org.apache.commons.io.FileUtils
+import org.apache.samza.{SamzaException, checkpoint}
 import org.apache.samza.config.Config
+import org.apache.samza.serializers.CheckpointSerde
 import org.apache.samza.util.Logging
 import org.rocksdb.{TtlDB, _}
 
@@ -236,6 +239,13 @@ class RocksDbKeyValueStore(
     trace("Flushed store: %s" format storeName)
   }
 
+  override def checkpoint(id: String): Optional[Path] = {
+    val checkpoint = Checkpoint.create(db)
+    val checkpointPath = dir.getPath + "-" + id
+    checkpoint.createCheckpoint(checkpointPath)
+    Optional.of(Paths.get(checkpointPath))
+  }
+
   def close(): Unit = {
     trace("Calling compact range.")
     stateChangeLock.writeLock().lock()
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index f53f9e8..7e514e7 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -18,8 +18,10 @@
  */
 package org.apache.samza.storage.kv;
 
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -123,6 +125,11 @@ public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> {
     store.flush();
   }
 
+  @Override
+  public Optional<Path> checkpoint(String id) {
+    return store.checkpoint(id);
+  }
+
   private void validateMessageSize(byte[] message) {
     if (!dropLargeMessages && isLargeMessage(message)) {
       throw new RecordTooLargeException("The message size " + message.length + " for store " + storeName
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index 78a7b0b..ace7aa5 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -20,7 +20,10 @@
 package org.apache.samza.storage.kv
 
 
+import java.nio.file.Path
 import java.util
+import java.util.Optional
+
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.util.Logging
@@ -159,4 +162,8 @@ class AccessLoggedStore[K, V](
     val bytes = keySerde.toBytes(key)
     bytes
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    store.checkpoint(id)
+  }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index 1978426..fef1deb 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -69,18 +69,18 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
    * @param storeDir The directory of the storage engine.
    * @param keySerde The serializer to use for serializing keys when reading or writing to the store.
    * @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
-   * @param collector MessageCollector the storage engine uses to persist changes.
+   * @param changelogCollector MessageCollector the storage engine uses to persist changes.
    * @param registry MetricsRegistry to which to publish storage-engine specific metrics.
-   * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+   * @param changelogSSP Samza system stream partition from which to receive the changelog.
    * @param containerContext Information about the container in which the task is executing.
    **/
   def getStorageEngine(storeName: String,
     storeDir: File,
     keySerde: Serde[K],
     msgSerde: Serde[V],
-    collector: MessageCollector,
+    changelogCollector: MessageCollector,
     registry: MetricsRegistry,
-    changeLogSystemStreamPartition: SystemStreamPartition,
+    changelogSSP: SystemStreamPartition,
     jobContext: JobContext,
     containerContext: ContainerContext, storeMode : StoreMode): StorageEngine = {
     val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true)
@@ -117,15 +117,15 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     }
 
     val rawStore =
-      getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, jobContext, containerContext, storeMode)
+      getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode)
 
     // maybe wrap with logging
-    val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
+    val maybeLoggedStore = if (changelogSSP == null) {
       rawStore
     } else {
       val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
       storePropertiesBuilder = storePropertiesBuilder.setLoggedStore(true)
-      new LoggedStore(rawStore, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
+      new LoggedStore(rawStore, changelogSSP, changelogCollector, loggedStoreMetrics)
     }
 
     var toBeAccessLoggedStore: KeyValueStore[K, V] = null
@@ -166,7 +166,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     }
 
     val maybeAccessLoggedStore = if (accessLog) {
-      new AccessLoggedStore(toBeAccessLoggedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde)
+      new AccessLoggedStore(toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig, storeName, keySerde)
     } else {
       toBeAccessLoggedStore
     }
@@ -175,7 +175,6 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     val nullSafeStore = new NullSafeKeyValueStore(maybeAccessLoggedStore)
 
     // create the storage engine and return
-    // TODO: Decide if we should use raw bytes when restoring
     val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
     val metricsConfig = new MetricsConfig(jobContext.getConfig)
     val clock = if (metricsConfig.getMetricsTimerEnabled) {
@@ -189,7 +188,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     }
 
     new KeyValueStorageEngine(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
-      keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
+      changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
   }
 
   def createCachedStore[K, V](storeName: String, registry: MetricsRegistry,
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 959e49e..41d2d9f 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -21,7 +21,8 @@ package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Logging
 import scala.collection._
-import java.util.Arrays
+import java.nio.file.Path
+import java.util.{Arrays, Optional}
 
 /**
  * A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
@@ -291,6 +292,10 @@ class CachedStore[K, V](
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     store.snapshot(from, to)
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    store.checkpoint(id)
+  }
 }
 
 private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K])
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 0434199..4a0116b 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -20,10 +20,13 @@
 package org.apache.samza.storage.kv
 
 import java.io.File
+import java.nio.file.Path
+import java.util.Optional
 
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StorageEngine, StoreProperties}
-import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.MessageCollector
 import org.apache.samza.util.TimerUtil
 
 import scala.collection.JavaConverters._
@@ -39,6 +42,8 @@ class KeyValueStorageEngine[K, V](
   storeProperties: StoreProperties,
   wrapperStore: KeyValueStore[K, V],
   rawStore: KeyValueStore[Array[Byte], Array[Byte]],
+  changelogSSP: SystemStreamPartition,
+  changelogCollector: MessageCollector,
   metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
   batchSize: Int = 500,
   val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtil with Logging {
@@ -124,14 +129,9 @@ class KeyValueStorageEngine[K, V](
       }
 
       if (valBytes != null) {
-        metrics.restoredBytes.inc(valBytes.length)
         metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length)
       }
-
-      metrics.restoredBytes.inc(keyBytes.length)
       metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length)
-
-      metrics.restoredMessages.inc()
       metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1)
       count += 1
 
@@ -155,6 +155,14 @@ class KeyValueStorageEngine[K, V](
     }
   }
 
+  def checkpoint(id: String): Optional[Path] = {
+    updateTimer(metrics.checkpointNs) {
+      trace("Checkpointing.")
+      metrics.checkpoints.inc
+      wrapperStore.checkpoint(id)
+    }
+  }
+
   def stop() = {
     trace("Stopping.")
 
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 92889ed..e86155c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -34,6 +34,7 @@ class KeyValueStorageEngineMetrics(
   val deletes = newCounter("deletes")
   val deleteAlls = newCounter("delete-alls")
   val flushes = newCounter("flushes")
+  val checkpoints = newCounter("checkpoints")
   val alls = newCounter("alls")
   val ranges = newCounter("ranges")
   val snapshots = newCounter("snapshots")
@@ -45,15 +46,16 @@ class KeyValueStorageEngineMetrics(
   val deleteNs = newTimer("delete-ns")
   val deleteAllNs = newTimer("delete-all-ns")
   val flushNs = newTimer("flush-ns")
+  val checkpointNs = newTimer("checkpoint-ns")
   val allNs = newTimer("all-ns")
   val rangeNs = newTimer("range-ns")
   val snapshotNs = newTimer("snapshot-ns")
 
-  val restoredMessages = newCounter("messages-restored") //Deprecated
   val restoredMessagesGauge = newGauge("restored-messages", 0)
+  val trimmedMessagesGauge = newGauge("trimmed-messages", 0)
 
-  val restoredBytes = newCounter("messages-bytes") //Deprecated
   val restoredBytesGauge = newGauge("restored-bytes", 0)
+  val trimmedBytesGauge = newGauge("trimmed-bytes", 0)
 
   override def getPrefix = storeName + "-"
 }
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index fa37b21..4c238bb 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -19,6 +19,9 @@
 
 package org.apache.samza.storage.kv
 
+import java.nio.file.Path
+import java.util.Optional
+
 import org.apache.samza.util.Logging
 import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
 import org.apache.samza.task.MessageCollector
@@ -117,4 +120,8 @@ class LoggedStore[K, V](
   override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
     store.snapshot(from, to)
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    store.checkpoint(id)
+  }
 }
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 6be0575..3bc4674 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -19,6 +19,9 @@
 
 package org.apache.samza.storage.kv
 
+import java.nio.file.Path
+import java.util.Optional
+
 import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
@@ -95,4 +98,8 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
     notNull(to, NullKeyErrorMessage)
     store.snapshot(from, to)
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    store.checkpoint(id)
+  }
 }
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 567e7b8..169452c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -19,6 +19,8 @@
 
 package org.apache.samza.storage.kv
 
+import java.nio.file.Path
+import java.util.Optional
 import org.apache.samza.util.Logging
 import org.apache.samza.serializers._
 
@@ -163,4 +165,8 @@ class SerializedKeyValueStore[K, V](
       }
     }
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    store.checkpoint(id)
+  }
 }
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index 4526641..c20c2c5 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -21,6 +21,8 @@ package org.apache.samza.storage.kv
 
 import scala.collection.JavaConverters._
 import java.util
+import java.nio.file.Path
+import java.util.Optional
 
 /**
  * A mock key-value store wrapper that handles serialization
@@ -73,4 +75,8 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
   override def snapshot(from: String, to: String): KeyValueSnapshot[String, String] = {
     throw new UnsupportedOperationException("iterator() not supported")
   }
+
+  override def checkpoint(id: String): Optional[Path] = {
+    Optional.empty()
+  }
 }
\ No newline at end of file
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 8806a81..5f648f6 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -26,6 +26,7 @@ import org.apache.samza.Partition
 import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.MessageCollector
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Mockito._
@@ -42,8 +43,11 @@ class TestKeyValueStorageEngine {
     val storeName = "test-storeName"
     val storeDir = mock(classOf[File])
     val properties = mock(classOf[StoreProperties])
+    val changelogSSP = mock(classOf[SystemStreamPartition])
+    val changelogCollector = mock(classOf[MessageCollector])
     metrics = new KeyValueStorageEngineMetrics
-    engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
+    engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv,
+      changelogSSP, changelogCollector, metrics, clock = () => { getNextTimestamp() })
   }
 
   @After