You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/02 01:15:25 UTC
[samza] branch master updated: Transactional State [1/5]: Added
Store Checkpoint API
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 18755a3 Transactional State [1/5]: Added Store Checkpoint API
18755a3 is described below
commit 18755a38eef35d9b1b730fef6158efefab80832c
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 1 18:15:19 2019 -0700
Transactional State [1/5]: Added Store Checkpoint API
This PR adds a new 'checkpoint' API to KeyValueStore that allows taking persistent (on-disk) snapshots of the local store state during commits for implementations that support it.
In RocksDB, this creates a new Checkpoint directory.
For supporting transactional state, during a commit, the newest changelog SSP offset is stored in the OFFSET file in this checkpoint directory, as well as in the checkpoint topic. During container start / store restore, the checkpoint directory corresponding to the changelog offset in the checkpoint topic is selected (if any) and used as the basis for restore.
Thanks to @xinyuiscool for contributing this.
---
build.gradle | 1 +
gradle/dependency-versions.gradle | 1 +
.../java/org/apache/samza/storage/StorageEngine.java | 7 +++++++
.../org/apache/samza/storage/kv/KeyValueStore.java | 8 ++++++++
.../samza/operators/util/InternalInMemoryStore.java | 7 +++++++
.../operators/impl/store/TestInMemoryStore.java | 7 +++++++
.../org/apache/samza/storage/MockStorageEngine.java | 7 +++++++
.../storage/kv/inmemory/InMemoryKeyValueStore.scala | 7 +++++++
.../samza/storage/kv/RocksDbKeyValueStore.scala | 14 ++++++++++++--
.../samza/storage/kv/LargeMessageSafeStore.java | 7 +++++++
.../apache/samza/storage/kv/AccessLoggedStore.scala | 7 +++++++
.../kv/BaseKeyValueStorageEngineFactory.scala | 19 +++++++++----------
.../org/apache/samza/storage/kv/CachedStore.scala | 7 ++++++-
.../samza/storage/kv/KeyValueStorageEngine.scala | 20 ++++++++++++++------
.../storage/kv/KeyValueStorageEngineMetrics.scala | 6 ++++--
.../org/apache/samza/storage/kv/LoggedStore.scala | 7 +++++++
.../samza/storage/kv/NullSafeKeyValueStore.scala | 7 +++++++
.../samza/storage/kv/SerializedKeyValueStore.scala | 6 ++++++
.../apache/samza/storage/kv/MockKeyValueStore.scala | 6 ++++++
.../samza/storage/kv/TestKeyValueStorageEngine.scala | 6 +++++-
20 files changed, 135 insertions(+), 22 deletions(-)
diff --git a/build.gradle b/build.gradle
index b7385f9..2c35918 100644
--- a/build.gradle
+++ b/build.gradle
@@ -184,6 +184,7 @@ project(":samza-core_$scalaSuffix") {
compile "net.sf.jopt-simple:jopt-simple:$joptSimpleVersion"
compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
compile "org.apache.commons:commons-lang3:$commonsLang3Version"
+ compile "commons-io:commons-io:$commonsIoVersion"
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "org.scala-lang:scala-library:$scalaVersion"
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 6fe812f..f70e879 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -25,6 +25,7 @@
commonsCollectionVersion = "3.2.1"
commonsHttpClientVersion = "3.1"
commonsLang3Version = "3.4"
+ commonsIoVersion = "2.6"
elasticsearchVersion = "2.2.0"
gsonVersion = "2.8.5"
guavaVersion = "23.0"
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 4e6950a..58c6368 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -21,6 +21,8 @@ package org.apache.samza.storage;
import java.util.Iterator;
+import java.nio.file.Path;
+import java.util.Optional;
import org.apache.samza.system.IncomingMessageEnvelope;
/**
@@ -52,6 +54,11 @@ public interface StorageEngine {
void flush();
/**
+ * Checkpoint store snapshots.
+ */
+ Optional<Path> checkpoint(String id);
+
+ /**
* Close the storage engine
*/
void stop();
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index 67d7fb3..fa9b4de 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -19,9 +19,12 @@
package org.apache.samza.storage.kv;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+
/**
* A key-value store that supports put, get, delete, and range queries.
@@ -141,4 +144,9 @@ public interface KeyValueStore<K, V> {
* Flushes this key-value store, if applicable.
*/
void flush();
+
+ /**
+ * Checkpoint the store snapshot.
+ */
+ Optional<Path> checkpoint(String id);
}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 10a92f8..2ad25eb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -24,11 +24,13 @@ import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/**
* Implements a {@link KeyValueStore} using an in-memory Java Map.
@@ -136,4 +138,9 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
public void flush() {
//not applicable
}
+
+ @Override
+ public Optional<Path> checkpoint(String id) {
+ return Optional.empty();
+ }
}
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index 9c2306a..c742409 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -25,10 +25,12 @@ import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueIterator;
import org.apache.samza.storage.kv.KeyValueStore;
+import java.nio.file.Path;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -129,6 +131,11 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
}
+ @Override
+ public Optional<Path> checkpoint(String id) {
+ return Optional.empty();
+ }
+
private static class InMemoryIterator<K, V> implements KeyValueIterator<K, V> {
Iterator<Map.Entry<byte[], byte[]>> wrapped;
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index fda1355..5d46b43 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -20,11 +20,13 @@
package org.apache.samza.storage;
import java.io.File;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
@@ -61,6 +63,11 @@ public class MockStorageEngine implements StorageEngine {
}
@Override
+ public Optional<Path> checkpoint(String id) {
+ return Optional.empty();
+ }
+
+ @Override
public void stop() {
}
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index 988d1c9..d482c15 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -21,7 +21,9 @@ package org.apache.samza.storage.kv.inmemory
import com.google.common.primitives.UnsignedBytes
import org.apache.samza.util.Logging
import org.apache.samza.storage.kv._
+import java.nio.file.Path
import java.util
+import java.util.Optional
/**
* In memory implementation of a key value store.
@@ -124,4 +126,9 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
override def close() { }
}
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ // No checkpoint being persisted. State restores from Changelog.
+ Optional.empty()
+ }
}
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index e4f78d3..a2ae8b0 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -20,13 +20,16 @@
package org.apache.samza.storage.kv
import java.io.File
+import java.nio.file.{Path, Paths}
import java.util
-import java.util.Comparator
+import java.util.{Comparator, Optional}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
-import org.apache.samza.SamzaException
+import org.apache.commons.io.FileUtils
+import org.apache.samza.{SamzaException, checkpoint}
import org.apache.samza.config.Config
+import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.util.Logging
import org.rocksdb.{TtlDB, _}
@@ -236,6 +239,13 @@ class RocksDbKeyValueStore(
trace("Flushed store: %s" format storeName)
}
+ override def checkpoint(id: String): Optional[Path] = {
+ val checkpoint = Checkpoint.create(db)
+ val checkpointPath = dir.getPath + "-" + id
+ checkpoint.createCheckpoint(checkpointPath)
+ Optional.of(Paths.get(checkpointPath))
+ }
+
def close(): Unit = {
trace("Calling compact range.")
stateChangeLock.writeLock().lock()
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index f53f9e8..7e514e7 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -18,8 +18,10 @@
*/
package org.apache.samza.storage.kv;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +125,11 @@ public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> {
store.flush();
}
+ @Override
+ public Optional<Path> checkpoint(String id) {
+ return store.checkpoint(id);
+ }
+
private void validateMessageSize(byte[] message) {
if (!dropLargeMessages && isLargeMessage(message)) {
throw new RecordTooLargeException("The message size " + message.length + " for store " + storeName
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index 78a7b0b..ace7aa5 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -20,7 +20,10 @@
package org.apache.samza.storage.kv
+import java.nio.file.Path
import java.util
+import java.util.Optional
+
import org.apache.samza.config.StorageConfig
import org.apache.samza.task.MessageCollector
import org.apache.samza.util.Logging
@@ -159,4 +162,8 @@ class AccessLoggedStore[K, V](
val bytes = keySerde.toBytes(key)
bytes
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ store.checkpoint(id)
+ }
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
index 1978426..fef1deb 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
@@ -69,18 +69,18 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
* @param storeDir The directory of the storage engine.
* @param keySerde The serializer to use for serializing keys when reading or writing to the store.
* @param msgSerde The serializer to use for serializing messages when reading or writing to the store.
- * @param collector MessageCollector the storage engine uses to persist changes.
+ * @param changelogCollector MessageCollector the storage engine uses to persist changes.
* @param registry MetricsRegistry to which to publish storage-engine specific metrics.
- * @param changeLogSystemStreamPartition Samza stream partition from which to receive the changelog.
+ * @param changelogSSP Samza system stream partition from which to receive the changelog.
* @param containerContext Information about the container in which the task is executing.
**/
def getStorageEngine(storeName: String,
storeDir: File,
keySerde: Serde[K],
msgSerde: Serde[V],
- collector: MessageCollector,
+ changelogCollector: MessageCollector,
registry: MetricsRegistry,
- changeLogSystemStreamPartition: SystemStreamPartition,
+ changelogSSP: SystemStreamPartition,
jobContext: JobContext,
containerContext: ContainerContext, storeMode : StoreMode): StorageEngine = {
val storageConfigSubset = jobContext.getConfig.subset("stores." + storeName + ".", true)
@@ -117,15 +117,15 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
}
val rawStore =
- getKVStore(storeName, storeDir, registry, changeLogSystemStreamPartition, jobContext, containerContext, storeMode)
+ getKVStore(storeName, storeDir, registry, changelogSSP, jobContext, containerContext, storeMode)
// maybe wrap with logging
- val maybeLoggedStore = if (changeLogSystemStreamPartition == null) {
+ val maybeLoggedStore = if (changelogSSP == null) {
rawStore
} else {
val loggedStoreMetrics = new LoggedStoreMetrics(storeName, registry)
storePropertiesBuilder = storePropertiesBuilder.setLoggedStore(true)
- new LoggedStore(rawStore, changeLogSystemStreamPartition, collector, loggedStoreMetrics)
+ new LoggedStore(rawStore, changelogSSP, changelogCollector, loggedStoreMetrics)
}
var toBeAccessLoggedStore: KeyValueStore[K, V] = null
@@ -166,7 +166,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
}
val maybeAccessLoggedStore = if (accessLog) {
- new AccessLoggedStore(toBeAccessLoggedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde)
+ new AccessLoggedStore(toBeAccessLoggedStore, changelogCollector, changelogSSP, storageConfig, storeName, keySerde)
} else {
toBeAccessLoggedStore
}
@@ -175,7 +175,6 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
val nullSafeStore = new NullSafeKeyValueStore(maybeAccessLoggedStore)
// create the storage engine and return
- // TODO: Decide if we should use raw bytes when restoring
val keyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics(storeName, registry)
val metricsConfig = new MetricsConfig(jobContext.getConfig)
val clock = if (metricsConfig.getMetricsTimerEnabled) {
@@ -189,7 +188,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
}
new KeyValueStorageEngine(storeName, storeDir, storePropertiesBuilder.build(), nullSafeStore, rawStore,
- keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
+ changelogSSP, changelogCollector, keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
}
def createCachedStore[K, V](storeName: String, registry: MetricsRegistry,
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 959e49e..41d2d9f 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -21,7 +21,8 @@ package org.apache.samza.storage.kv
import org.apache.samza.util.Logging
import scala.collection._
-import java.util.Arrays
+import java.nio.file.Path
+import java.util.{Arrays, Optional}
/**
* A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
@@ -291,6 +292,10 @@ class CachedStore[K, V](
override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
store.snapshot(from, to)
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ store.checkpoint(id)
+ }
}
private case class CacheEntry[K, V](var value: V, var dirty: mutable.DoubleLinkedList[K])
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 0434199..4a0116b 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -20,10 +20,13 @@
package org.apache.samza.storage.kv
import java.io.File
+import java.nio.file.Path
+import java.util.Optional
import org.apache.samza.util.Logging
import org.apache.samza.storage.{StorageEngine, StoreProperties}
-import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.MessageCollector
import org.apache.samza.util.TimerUtil
import scala.collection.JavaConverters._
@@ -39,6 +42,8 @@ class KeyValueStorageEngine[K, V](
storeProperties: StoreProperties,
wrapperStore: KeyValueStore[K, V],
rawStore: KeyValueStore[Array[Byte], Array[Byte]],
+ changelogSSP: SystemStreamPartition,
+ changelogCollector: MessageCollector,
metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
batchSize: Int = 500,
val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtil with Logging {
@@ -124,14 +129,9 @@ class KeyValueStorageEngine[K, V](
}
if (valBytes != null) {
- metrics.restoredBytes.inc(valBytes.length)
metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + valBytes.length)
}
-
- metrics.restoredBytes.inc(keyBytes.length)
metrics.restoredBytesGauge.set(metrics.restoredBytesGauge.getValue + keyBytes.length)
-
- metrics.restoredMessages.inc()
metrics.restoredMessagesGauge.set(metrics.restoredMessagesGauge.getValue + 1)
count += 1
@@ -155,6 +155,14 @@ class KeyValueStorageEngine[K, V](
}
}
+ def checkpoint(id: String): Optional[Path] = {
+ updateTimer(metrics.checkpointNs) {
+ trace("Checkpointing.")
+ metrics.checkpoints.inc
+ wrapperStore.checkpoint(id)
+ }
+ }
+
def stop() = {
trace("Stopping.")
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
index 92889ed..e86155c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngineMetrics.scala
@@ -34,6 +34,7 @@ class KeyValueStorageEngineMetrics(
val deletes = newCounter("deletes")
val deleteAlls = newCounter("delete-alls")
val flushes = newCounter("flushes")
+ val checkpoints = newCounter("checkpoints")
val alls = newCounter("alls")
val ranges = newCounter("ranges")
val snapshots = newCounter("snapshots")
@@ -45,15 +46,16 @@ class KeyValueStorageEngineMetrics(
val deleteNs = newTimer("delete-ns")
val deleteAllNs = newTimer("delete-all-ns")
val flushNs = newTimer("flush-ns")
+ val checkpointNs = newTimer("checkpoint-ns")
val allNs = newTimer("all-ns")
val rangeNs = newTimer("range-ns")
val snapshotNs = newTimer("snapshot-ns")
- val restoredMessages = newCounter("messages-restored") //Deprecated
val restoredMessagesGauge = newGauge("restored-messages", 0)
+ val trimmedMessagesGauge = newGauge("trimmed-messages", 0)
- val restoredBytes = newCounter("messages-bytes") //Deprecated
val restoredBytesGauge = newGauge("restored-bytes", 0)
+ val trimmedBytesGauge = newGauge("trimmed-bytes", 0)
override def getPrefix = storeName + "-"
}
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index fa37b21..4c238bb 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -19,6 +19,9 @@
package org.apache.samza.storage.kv
+import java.nio.file.Path
+import java.util.Optional
+
import org.apache.samza.util.Logging
import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
import org.apache.samza.task.MessageCollector
@@ -117,4 +120,8 @@ class LoggedStore[K, V](
override def snapshot(from: K, to: K): KeyValueSnapshot[K, V] = {
store.snapshot(from, to)
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ store.checkpoint(id)
+ }
}
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 6be0575..3bc4674 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -19,6 +19,9 @@
package org.apache.samza.storage.kv
+import java.nio.file.Path
+import java.util.Optional
+
import scala.collection.JavaConverters._
object NullSafeKeyValueStore {
@@ -95,4 +98,8 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
notNull(to, NullKeyErrorMessage)
store.snapshot(from, to)
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ store.checkpoint(id)
+ }
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 567e7b8..169452c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -19,6 +19,8 @@
package org.apache.samza.storage.kv
+import java.nio.file.Path
+import java.util.Optional
import org.apache.samza.util.Logging
import org.apache.samza.serializers._
@@ -163,4 +165,8 @@ class SerializedKeyValueStore[K, V](
}
}
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ store.checkpoint(id)
+ }
}
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index 4526641..c20c2c5 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -21,6 +21,8 @@ package org.apache.samza.storage.kv
import scala.collection.JavaConverters._
import java.util
+import java.nio.file.Path
+import java.util.Optional
/**
* A mock key-value store wrapper that handles serialization
@@ -73,4 +75,8 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
override def snapshot(from: String, to: String): KeyValueSnapshot[String, String] = {
throw new UnsupportedOperationException("iterator() not supported")
}
+
+ override def checkpoint(id: String): Optional[Path] = {
+ Optional.empty()
+ }
}
\ No newline at end of file
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
index 8806a81..5f648f6 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStorageEngine.scala
@@ -26,6 +26,7 @@ import org.apache.samza.Partition
import org.apache.samza.container.TaskName
import org.apache.samza.storage.StoreProperties
import org.apache.samza.system.{IncomingMessageEnvelope, SystemStreamPartition}
+import org.apache.samza.task.MessageCollector
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.Mockito._
@@ -42,8 +43,11 @@ class TestKeyValueStorageEngine {
val storeName = "test-storeName"
val storeDir = mock(classOf[File])
val properties = mock(classOf[StoreProperties])
+ val changelogSSP = mock(classOf[SystemStreamPartition])
+ val changelogCollector = mock(classOf[MessageCollector])
metrics = new KeyValueStorageEngineMetrics
- engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv, metrics, clock = () => { getNextTimestamp() })
+ engine = new KeyValueStorageEngine[String, String](storeName, storeDir, properties, wrapperKv, rawKv,
+ changelogSSP, changelogCollector, metrics, clock = () => { getNextTimestamp() })
}
@After