You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/10/23 01:40:34 UTC
[samza] 03/03: SAMZA-2356: [Transactional State] Do not trim
changelog if time since last checkpoint is greater than min.compaction.lag.
(#1196)
This is an automated email from the ASF dual-hosted git repository.
pmaheshwari pushed a commit to branch 1.3.0
in repository https://gitbox.apache.org/repos/asf/samza.git
commit cb2707f4151cfa5e58eddc46ffc38cb633397349
Author: Prateek Maheshwari <pr...@utexas.edu>
AuthorDate: Tue Oct 22 17:55:54 2019 -0700
SAMZA-2356: [Transactional State] Do not trim changelog if time since last checkpoint is greater than min.compaction.lag. (#1196)
---
.../org/apache/samza/checkpoint/CheckpointId.java | 82 +++++++
.../checkpoint/CheckpointedChangelogOffset.java | 82 +++++++
.../org/apache/samza/storage/StorageEngine.java | 5 +-
.../org/apache/samza/storage/kv/KeyValueStore.java | 5 +-
.../java/org/apache/samza/config/TaskConfig.java | 8 +-
.../operators/util/InternalInMemoryStore.java | 3 +-
.../TransactionalStateTaskRestoreManager.java | 62 ++++--
.../org/apache/samza/container/TaskInstance.scala | 23 +-
.../NonTransactionalStateTaskStorageManager.scala | 8 +-
.../apache/samza/storage/TaskStorageManager.scala | 5 +-
.../TransactionalStateTaskStorageManager.scala | 12 +-
.../operators/impl/store/TestInMemoryStore.java | 4 +-
.../apache/samza/storage/MockStorageEngine.java | 3 +-
.../TestTransactionalStateTaskRestoreManager.java | 241 +++++++++++++++++++--
.../apache/samza/container/TestTaskInstance.scala | 22 +-
.../samza/storage/TestContainerStorageManager.java | 2 +-
.../TestTransactionalStateTaskStorageManager.java | 20 +-
.../kv/inmemory/InMemoryKeyValueStore.scala | 4 +-
.../samza/storage/kv/RocksDbKeyValueStore.scala | 12 +-
.../samza/storage/kv/LargeMessageSafeStore.java | 3 +-
.../samza/storage/kv/AccessLoggedStore.scala | 3 +-
.../org/apache/samza/storage/kv/CachedStore.scala | 5 +-
.../samza/storage/kv/KeyValueStorageEngine.scala | 4 +-
.../org/apache/samza/storage/kv/LoggedStore.scala | 3 +-
.../samza/storage/kv/NullSafeKeyValueStore.scala | 4 +-
.../samza/storage/kv/SerializedKeyValueStore.scala | 4 +-
.../samza/storage/kv/MockKeyValueStore.scala | 4 +-
.../kv/TransactionalStateIntegrationTest.java | 2 +-
...ransactionalStateMultiStoreIntegrationTest.java | 2 +-
29 files changed, 528 insertions(+), 109 deletions(-)
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
new file mode 100644
index 0000000..95dfd24
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointId.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Checkpoint ID has the format: [currentTimeMillis, last 6 digits of nanotime], separated by a dash.
+ * This is to avoid conflicts, e.g when requesting frequent manual commits.
+ *
+ * It is expected that persistent stores use the {@link #toString()} representation of the checkpoint id
+ * as the store checkpoint directory name.
+ */
+@InterfaceStability.Unstable
+public class CheckpointId {
+ public static final String SEPARATOR = "-";
+
+ private final long millis;
+ private final long nanos;
+
+ public CheckpointId(long millis, long nanos) {
+ this.millis = millis;
+ this.nanos = nanos;
+ }
+
+ public static CheckpointId create() {
+ return new CheckpointId(System.currentTimeMillis(), System.nanoTime() % 1000000);
+ }
+
+ public static CheckpointId fromString(String checkpointId) {
+ if (StringUtils.isBlank(checkpointId)) {
+ throw new IllegalArgumentException("Invalid checkpoint id: " + checkpointId);
+ }
+ String[] parts = checkpointId.split(SEPARATOR);
+ return new CheckpointId(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
+ }
+
+ public long getMillis() {
+ return millis;
+ }
+
+ public long getNanos() {
+ return nanos;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s%s%s", millis, SEPARATOR, nanos);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CheckpointId that = (CheckpointId) o;
+ return millis == that.millis &&
+ nanos == that.nanos;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(millis, nanos);
+ }
+}
\ No newline at end of file
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java
new file mode 100644
index 0000000..407ce7a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointedChangelogOffset.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.checkpoint;
+
+import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Checkpointed changelog offset has the format: [checkpointId, offset], separated by a colon.
+ */
+@InterfaceStability.Unstable
+public class CheckpointedChangelogOffset {
+ public static final String SEPARATOR = ":";
+
+ private final CheckpointId checkpointId;
+ private final String offset;
+
+ public CheckpointedChangelogOffset(CheckpointId checkpointId, String offset) {
+ this.checkpointId = checkpointId;
+ this.offset = offset;
+ }
+
+ public static CheckpointedChangelogOffset fromString(String message) {
+ if (StringUtils.isBlank(message)) {
+ throw new IllegalArgumentException("Invalid checkpointed changelog message: " + message);
+ }
+ String[] checkpointIdAndOffset = message.split(":");
+ if (checkpointIdAndOffset.length != 2) {
+ throw new IllegalArgumentException("Invalid checkpointed changelog offset: " + message);
+ }
+ CheckpointId checkpointId = CheckpointId.fromString(checkpointIdAndOffset[0]);
+ String offset = null;
+ if (!"null".equals(checkpointIdAndOffset[1])) {
+ offset = checkpointIdAndOffset[1];
+ }
+ return new CheckpointedChangelogOffset(checkpointId, offset);
+ }
+
+ public CheckpointId getCheckpointId() {
+ return checkpointId;
+ }
+
+ public String getOffset() {
+ return offset;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s%s%s", checkpointId, SEPARATOR, offset);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CheckpointedChangelogOffset that = (CheckpointedChangelogOffset) o;
+ return Objects.equals(checkpointId, that.checkpointId) &&
+ Objects.equals(offset, that.offset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(checkpointId, offset);
+ }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
index 804a250..8add1de 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
@@ -22,6 +22,8 @@ package org.apache.samza.storage;
import java.nio.file.Path;
import java.util.Optional;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.system.ChangelogSSPIterator;
/**
@@ -55,7 +57,8 @@ public interface StorageEngine {
/**
* Checkpoint store snapshots.
*/
- Optional<Path> checkpoint(String id);
+ @InterfaceStability.Unstable
+ Optional<Path> checkpoint(CheckpointId id);
/**
* Close the storage engine
diff --git a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
index d262e29..41faac3 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/kv/KeyValueStore.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.checkpoint.CheckpointId;
/**
@@ -150,6 +151,6 @@ public interface KeyValueStore<K, V> {
* Create a persistent checkpoint / snapshot of the current store state and return it's path.
* @return the path of the persistent store checkpoint, or an empty optional if checkpoints are not supported.
*/
- @InterfaceStability.Evolving
- Optional<Path> checkpoint(String id);
+ @InterfaceStability.Unstable
+ Optional<Path> checkpoint(CheckpointId id);
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 22e0fa9..b02f6c9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -110,8 +110,8 @@ public class TaskConfig extends MapConfig {
private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = true;
public static final String TRANSACTIONAL_STATE_RESTORE_ENABLED = "task.transactional.state.restore.enabled";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED = false;
- public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE =
- "task.transactional.state.retain.existing.changelog.state";
+ public static final String TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE =
+ "task.transactional.state.retain.existing.state";
private static final boolean DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE = true;
public TaskConfig(Config config) {
@@ -313,7 +313,7 @@ public class TaskConfig extends MapConfig {
return getBoolean(TRANSACTIONAL_STATE_RESTORE_ENABLED, DEFAULT_TRANSACTIONAL_STATE_RESTORE_ENABLED);
}
- public boolean getTransactionalStateRetainExistingChangelogState() {
- return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
+ public boolean getTransactionalStateRetainExistingState() {
+ return getBoolean(TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, DEFAULT_TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE);
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index 2ad25eb..6a5ebf8 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.util;
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueSnapshot;
import org.apache.samza.storage.kv.KeyValueIterator;
@@ -140,7 +141,7 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public Optional<Path> checkpoint(String id) {
+ public Optional<Path> checkpoint(CheckpointId id) {
return Optional.empty();
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 4dd7f59..1e54ea1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -31,9 +31,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointedChangelogOffset;
import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
@@ -230,8 +233,16 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
SystemStreamPartitionMetadata changelogSSPMetadata = currentChangelogOffsets.get(changelogSSP);
String oldestOffset = changelogSSPMetadata.getOldestOffset();
String newestOffset = changelogSSPMetadata.getNewestOffset();
- String checkpointedOffset = checkpointedChangelogOffsets.get(changelogSSP);
+ String checkpointMessage = checkpointedChangelogOffsets.get(changelogSSP);
+ String checkpointedOffset = null; // can be null if no message, or message has null offset
+ long timeSinceLastCheckpointInMs = Long.MAX_VALUE;
+ if (StringUtils.isNotBlank(checkpointMessage)) {
+ CheckpointedChangelogOffset checkpointedChangelogOffset = CheckpointedChangelogOffset.fromString(checkpointMessage);
+ checkpointedOffset = checkpointedChangelogOffset.getOffset();
+ timeSinceLastCheckpointInMs = System.currentTimeMillis() -
+ checkpointedChangelogOffset.getCheckpointId().getMillis();
+ }
Optional<File> currentDirOptional;
Optional<List<File>> checkpointDirsOptional;
@@ -255,21 +266,27 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
storeDirsToDelete.put(storeName, currentDir);
});
- // first check if checkpointed offset is invalid (i.e., out of range of current offsets, or null)
if (checkpointedOffset == null && oldestOffset != null) {
// this can mean that either this is the initial migration for this feature and there are no previously
// checkpointed changelog offsets, or that this is a new store or changelog topic after the initial migration.
// if this is the first time migration, it might be desirable to retain existing data.
- // if this is new store or topic, it's possible that the container previously died after writing some data to
- // the changelog but before a commit, so it's desirable to delete the store, not restore anything and
+ // if this is new store or topic, it is possible that the container previously died after writing some data to
+ // the changelog but before a commit, so it is desirable to delete the store, not restore anything and
// trim the changelog
- // since we can't easily tell the difference b/w the two scenarios by just looking at the store and changelogs,
+ // since we can't tell the difference b/w the two scenarios by just looking at the store and changelogs,
// we'll request users to indicate whether to retain existing data using a config flag. this flag should only
// be set during migrations, and turned off after the first successful commit of the new container (i.e. next
// deploy). for simplicity, we'll always delete the local store, and restore from changelog if necessary.
+ // the former scenario should not be common. the recommended way to opt-in to the transactional state feature
+ // is to first upgrade to the latest samza version but keep the transactional state restore config off.
+ // this will create the store checkpoint directories and write the changelog offset to the checkpoint, but
+ // will not use them during restore. once this is done (i.e. at least one commit after upgrade), the
+ // transactional state restore feature can be turned on on subsequent deploys. this code path exists as a
+ // fail-safe against clearing changelogs in case users do not follow upgrade instructions and enable the
+ // feature directly.
checkpointDirsOptional.ifPresent(checkpointDirs ->
checkpointDirs.forEach(checkpointDir -> {
LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion since checkpointed " +
@@ -278,7 +295,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
storeDirsToDelete.put(storeName, checkpointDir);
}));
- if (new TaskConfig(config).getTransactionalStateRetainExistingChangelogState()) {
+ if (new TaskConfig(config).getTransactionalStateRetainExistingState()) {
// mark for restore from (oldest, newest) to recreate local state.
LOG.warn("Checkpointed offset for store: {} in task: {} is null. Since retain existing state is true, " +
"local state will be fully restored from current changelog contents. " +
@@ -290,7 +307,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
// mark for restore from (oldest, null) to trim entire changelog.
storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, null));
}
- } else if (// check if the checkpointed offset is in range of current oldest and newest offsets
+ } else if (// check if the checkpointed offset is out of range of current oldest and newest offsets
admin.offsetComparator(oldestOffset, checkpointedOffset) > 0 ||
admin.offsetComparator(checkpointedOffset, newestOffset) > 0) {
// checkpointed offset is out of range. this could mean that this is a TTL topic and the checkpointed
@@ -312,12 +329,29 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
"will be fully restored from current changelog contents.", storeName);
storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
} else { // persistent logged store
- // if there exists a valid store checkpoint directory with oldest offset <= local offset <= checkpointed offset,
+ String targetOffset;
+
+ // check checkpoint time against min.compaction.lag.ms. if older, restore from checkpointed offset to newest
+ // with no trim. be conservative. allow 10% safety margin to avoid deletions when the downtime is close
+ // to min.compaction.lag.ms
+ long minCompactionLagMs = new StorageConfig(config).getChangelogMinCompactionLagMs(storeName);
+ if (timeSinceLastCheckpointInMs > .9 * minCompactionLagMs) {
+ LOG.warn("Checkpointed offset for store: {} in task: {} is: {}. It is in range of oldest: {} and " +
+ "newest: {} changelog offset. However, time since last checkpoint is: {}, which is greater than " +
+ "0.9 * min.compaction.lag.ms: {} for the changelog topic. Since there is a chance that" +
+ "the changelog topic has been compacted, restoring store to the end of the current changelog contents." +
+ "There is no transactional local state guarantee.", storeName, taskName, checkpointedOffset,
+ oldestOffset, newestOffset, timeSinceLastCheckpointInMs, minCompactionLagMs);
+ targetOffset = newestOffset;
+ } else {
+ targetOffset = checkpointedOffset;
+ }
+
+ // if there exists a valid store checkpoint directory with oldest offset <= local offset <= target offset,
// retain it and restore the delta. delete all other checkpoint directories for the store. if more than one such
// checkpoint directory exists, retain the one with the highest local offset and delete the rest.
boolean hasValidCheckpointDir = false;
for (File checkpointDir: checkpointDirsOptional.get()) {
- // TODO HIGH pmaheshw: should validation check / warn for compact lag config staleness too?
if (storageManagerUtil.isLoggedStoreValid(
storeName, checkpointDir, config, storeChangelogs, taskModel, clock, storeEngines)) {
String localOffset = storageManagerUtil.readOffsetFile(
@@ -326,17 +360,17 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
checkpointDir, taskName);
if (admin.offsetComparator(localOffset, oldestOffset) >= 0 &&
- admin.offsetComparator(localOffset, checkpointedOffset) <= 0 &&
+ admin.offsetComparator(localOffset, targetOffset) <= 0 &&
(storesToRestore.get(storeName) == null ||
admin.offsetComparator(localOffset, storesToRestore.get(storeName).startingOffset) > 0)) {
hasValidCheckpointDir = true;
LOG.info("Temporarily marking checkpoint dir: {} for store: {} in task: {} for retention. " +
- "May be overridden later.", checkpointDir, storeName, taskName);
+ "May be overridden later.", checkpointDir, storeName, taskName);
storeDirToRetain.put(storeName, checkpointDir);
// mark for restore even if local == checkpointed, so that the changelog gets trimmed.
LOG.info("Temporarily marking store: {} in task: {} for restore from beginning offset: {} to " +
- "ending offset: {}. May be overridden later", storeName, taskName, localOffset, checkpointedOffset);
- storesToRestore.put(storeName, new RestoreOffsets(localOffset, checkpointedOffset));
+ "ending offset: {}. May be overridden later", storeName, taskName, localOffset, targetOffset);
+ storesToRestore.put(storeName, new RestoreOffsets(localOffset, targetOffset));
}
}
}
@@ -353,7 +387,7 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
// if the store had not valid checkpoint dirs to retain, restore from changelog
if (!hasValidCheckpointDir) {
- storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
+ storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, targetOffset));
}
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index a87c535..2a4f1d6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -24,7 +24,7 @@ import java.util.{Objects, Optional}
import java.util.concurrent.ScheduledExecutorService
import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointId, CheckpointedChangelogOffset, OffsetManager}
import org.apache.samza.config.{Config, StreamConfig, TaskConfig}
import org.apache.samza.context._
import org.apache.samza.job.model.{JobModel, TaskModel}
@@ -253,23 +253,26 @@ class TaskInstance(
trace("Flushing state stores for taskName: %s" format taskName)
newestChangelogOffsets = storageManager.flush()
trace("Got newest changelog offsets for taskName: %s as: %s " format(taskName, newestChangelogOffsets))
+ }
+
+ val checkpointId = CheckpointId.create()
+ if (storageManager != null && newestChangelogOffsets != null) {
+ trace("Checkpointing stores for taskName: %s with checkpoint id: %s" format (taskName, checkpointId))
+ storageManager.checkpoint(checkpointId, newestChangelogOffsets.toMap)
+ }
+
+ if (newestChangelogOffsets != null) {
newestChangelogOffsets.foreach {case (ssp, newestOffsetOption) =>
- allCheckpointOffsets.put(ssp, newestOffsetOption.orNull)
+ val offset = new CheckpointedChangelogOffset(checkpointId, newestOffsetOption.orNull).toString
+ allCheckpointOffsets.put(ssp, offset)
}
}
-
val checkpoint = new Checkpoint(allCheckpointOffsets)
trace("Got combined checkpoint offsets for taskName: %s as: %s" format (taskName, allCheckpointOffsets))
- var checkpointId: String = null
- if (storageManager != null && newestChangelogOffsets != null) {
- trace("Checkpointing stores for taskName: %s" format taskName)
- checkpointId = storageManager.checkpoint(newestChangelogOffsets.toMap)
- }
-
offsetManager.writeCheckpoint(taskName, checkpoint)
- if (storageManager != null && checkpointId != null) {
+ if (storageManager != null) {
trace("Remove old checkpoint stores for taskName: %s" format taskName)
storageManager.removeOldCheckpoints(checkpointId)
}
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
index cfca0f7..7b38749 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/NonTransactionalStateTaskStorageManager.scala
@@ -23,6 +23,7 @@ import java.io._
import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.ImmutableSet
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.container.TaskName
import org.apache.samza.job.model.TaskMode
import org.apache.samza.system._
@@ -57,11 +58,10 @@ class NonTransactionalStateTaskStorageManager(
newestChangelogSSPOffsets
}
- def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = {
- null
- }
+ override def checkpoint(checkpointId: CheckpointId,
+ newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {}
- override def removeOldCheckpoints(checkpointId: String): Unit = {}
+ override def removeOldCheckpoints(checkpointId: CheckpointId): Unit = {}
@VisibleForTesting
def stop() {
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c98461b..50d6418 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -19,6 +19,7 @@
package org.apache.samza.storage
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.system.SystemStreamPartition
trait TaskStorageManager {
@@ -27,9 +28,9 @@ trait TaskStorageManager {
def flush(): Map[SystemStreamPartition, Option[String]]
- def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String
+ def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit
- def removeOldCheckpoints(checkpointId: String): Unit
+ def removeOldCheckpoints(checkpointId: CheckpointId): Unit
def stop(): Unit
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
index c808866..20c7271 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TransactionalStateTaskStorageManager.scala
@@ -26,6 +26,7 @@ import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.ImmutableSet
import org.apache.commons.io.FileUtils
import org.apache.commons.io.filefilter.WildcardFileFilter
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.{Partition, SamzaException}
import org.apache.samza.container.TaskName
import org.apache.samza.job.model.TaskMode
@@ -56,15 +57,14 @@ class TransactionalStateTaskStorageManager(
getNewestChangelogSSPOffsets(taskName, storeChangelogs, partition, systemAdmins)
}
- def checkpoint(newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): String = {
+ def checkpoint(checkpointId: CheckpointId, newestChangelogOffsets: Map[SystemStreamPartition, Option[String]]): Unit = {
debug("Checkpointing stores.")
- val id = System.currentTimeMillis().toString
val checkpointPaths = containerStorageManager.getAllStores(taskName).asScala
.filter { case (storeName, storeEngine) =>
storeEngine.getStoreProperties.isLoggedStore && storeEngine.getStoreProperties.isPersistedToDisk}
.flatMap { case (storeName, storeEngine) => {
- val pathOptional = storeEngine.checkpoint(id)
+ val pathOptional = storeEngine.checkpoint(checkpointId)
if (pathOptional.isPresent) {
Some(storeName, pathOptional.get())
} else {
@@ -74,11 +74,9 @@ class TransactionalStateTaskStorageManager(
.toMap
writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, newestChangelogOffsets)
-
- id
}
- def removeOldCheckpoints(latestCheckpointId: String): Unit = {
+ def removeOldCheckpoints(latestCheckpointId: CheckpointId): Unit = {
if (latestCheckpointId != null) {
debug("Removing older checkpoints before " + latestCheckpointId)
@@ -93,7 +91,7 @@ class TransactionalStateTaskStorageManager(
val checkpointDirs = storeDir.listFiles(fileFilter)
checkpointDirs
- .filter(!_.getName.contains(latestCheckpointId))
+ .filter(!_.getName.contains(latestCheckpointId.toString))
.foreach(checkpointDir => {
FileUtils.deleteDirectory(checkpointDir)
})
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
index c742409..0022cc7 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/store/TestInMemoryStore.java
@@ -19,6 +19,8 @@
package org.apache.samza.operators.impl.store;
import com.google.common.primitives.UnsignedBytes;
+
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.serializers.Serde;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueSnapshot;
@@ -132,7 +134,7 @@ public class TestInMemoryStore<K, V> implements KeyValueStore<K, V> {
}
@Override
- public Optional<Path> checkpoint(String id) {
+ public Optional<Path> checkpoint(CheckpointId id) {
return Optional.empty();
}
diff --git a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
index 405abd7..2fdb81a 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/MockStorageEngine.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.system.ChangelogSSPIterator;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
@@ -63,7 +64,7 @@ public class MockStorageEngine implements StorageEngine {
}
@Override
- public Optional<Path> checkpoint(String id) {
+ public Optional<Path> checkpoint(CheckpointId id) {
return Optional.empty();
}
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
index 789c0e7..879f8d5 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
@@ -31,6 +31,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.checkpoint.CheckpointedChangelogOffset;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
@@ -180,8 +182,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -242,9 +246,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "21";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -307,9 +313,79 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
+ } };
+ Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
+ ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
+
+ SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
+ SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
+ when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin);
+ StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class);
+ File mockLoggedStoreBaseDir = mock(File.class);
+ File mockNonLoggedStoreBaseDir = mock(File.class);
+ Config mockConfig = mock(Config.class);
+ Clock mockClock = mock(Clock.class);
+
+ Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString()))
+ .thenAnswer((Answer<Integer>) invocation -> {
+ String offset1 = (String) invocation.getArguments()[0];
+ String offset2 = (String) invocation.getArguments()[1];
+ return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+ });
+
+ StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
+ mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
+ mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
+ mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
+
+ // ensure that there is nothing to retain or delete
+ assertEquals(0, storeActions.storeDirsToDelete.size());
+ assertEquals(0, storeActions.storeDirsToRetain.size());
+ // ensure that we mark the store for full restore (from current oldest to current newest)
+ assertEquals("10", storeActions.storesToRestore.get(store1Name).startingOffset);
+ assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset);
+ }
+
+ /**
+ * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when
+ * the job/container shut down and restarted after a long time.
+ */
+ @Test
+ public void testGetStoreActionsForLoggedNonPersistentStore_FullRestoreIfCheckpointedOffsetInRangeButMaybeCompacted() {
+ TaskModel mockTaskModel = mock(TaskModel.class);
+ TaskName taskName = new TaskName("Partition 0");
+ when(mockTaskModel.getTaskName()).thenReturn(taskName);
+ Partition taskChangelogPartition = new Partition(0);
+ when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+
+ String store1Name = "store1";
+ StorageEngine store1Engine = mock(StorageEngine.class);
+ StoreProperties mockStore1Properties = mock(StoreProperties.class);
+ when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties);
+ when(mockStore1Properties.isLoggedStore()).thenReturn(true);
+ when(mockStore1Properties.isPersistedToDisk()).thenReturn(false); // non-persistent store
+ Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine);
+
+ String changelog1SystemName = "system1";
+ String changelog1StreamName = "store1Changelog";
+ SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName);
+ SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition);
+ // checkpointed changelog offset > newest offset (e.g. changelog topic got changed)
+ SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("10", "20", "21");
+ Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
+
+ String changelog1CheckpointedOffset = "5";
+ CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint id older than default min.compaction.lag.ms
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset);
+ Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
+ new HashMap<SystemStreamPartition, String>() { {
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -375,9 +451,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = null;
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -389,7 +467,7 @@ public class TestTransactionalStateTaskRestoreManager {
File mockLoggedStoreBaseDir = mock(File.class);
File mockNonLoggedStoreBaseDir = mock(File.class);
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false");
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
Config mockConfig = new MapConfig(configMap);
Clock mockClock = mock(Clock.class);
@@ -437,9 +515,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = null;
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -451,7 +531,7 @@ public class TestTransactionalStateTaskRestoreManager {
File mockLoggedStoreBaseDir = mock(File.class);
File mockNonLoggedStoreBaseDir = mock(File.class);
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config mockConfig = new MapConfig(configMap);
Clock mockClock = mock(Clock.class);
@@ -499,8 +579,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -562,8 +644,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -643,8 +727,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -727,8 +813,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -810,8 +898,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -893,8 +983,10 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
ImmutableMap<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
- ImmutableMap.of(changelog1SSP, changelog1CheckpointedOffset);
+ ImmutableMap.of(changelog1SSP, changelog1CheckpointMessage.toString());
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -979,9 +1071,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = null;
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -993,7 +1087,7 @@ public class TestTransactionalStateTaskRestoreManager {
File mockLoggedStoreBaseDir = mock(File.class);
File mockNonLoggedStoreBaseDir = mock(File.class);
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config mockConfig = new MapConfig(configMap);
Clock mockClock = mock(Clock.class);
@@ -1072,9 +1166,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = null;
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1086,7 +1182,7 @@ public class TestTransactionalStateTaskRestoreManager {
File mockLoggedStoreBaseDir = mock(File.class);
File mockNonLoggedStoreBaseDir = mock(File.class);
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "false");
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "false");
Config mockConfig = new MapConfig(configMap);
Clock mockClock = mock(Clock.class);
@@ -1163,9 +1259,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1177,7 +1275,7 @@ public class TestTransactionalStateTaskRestoreManager {
File mockLoggedStoreBaseDir = mock(File.class);
File mockNonLoggedStoreBaseDir = mock(File.class);
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter
Config mockConfig = new MapConfig(configMap);
Clock mockClock = mock(Clock.class);
@@ -1250,9 +1348,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = null;
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1264,7 +1364,7 @@ public class TestTransactionalStateTaskRestoreManager {
File mockLoggedStoreBaseDir = mock(File.class);
File mockNonLoggedStoreBaseDir = mock(File.class);
HashMap<String, String> configMap = new HashMap<>();
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true"); // should not matter
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true"); // should not matter
Config mockConfig = new MapConfig(configMap);
Clock mockClock = mock(Clock.class);
@@ -1343,9 +1443,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "5";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
@@ -1403,6 +1505,97 @@ public class TestTransactionalStateTaskRestoreManager {
}
/**
+ * This can happen if the changelog offset is valid but the checkpoint is older than min compaction lag ms. E.g., when
+ * the job/container shut down and restarted after a long time.
+ */
+ @Test
+ public void testGetStoreActionsForLoggedPersistentStore_RestoreFromLocalToNewestIfCheckpointedOffsetInRangeButMaybeCompacted() {
+ TaskModel mockTaskModel = mock(TaskModel.class);
+ TaskName taskName = new TaskName("Partition 0");
+ when(mockTaskModel.getTaskName()).thenReturn(taskName);
+ Partition taskChangelogPartition = new Partition(0);
+ when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+
+ String store1Name = "store1";
+ StorageEngine store1Engine = mock(StorageEngine.class);
+ StoreProperties mockStore1Properties = mock(StoreProperties.class);
+ when(store1Engine.getStoreProperties()).thenReturn(mockStore1Properties);
+ when(mockStore1Properties.isLoggedStore()).thenReturn(true);
+ when(mockStore1Properties.isPersistedToDisk()).thenReturn(true);
+ Map<String, StorageEngine> mockStoreEngines = ImmutableMap.of(store1Name, store1Engine);
+
+ String changelog1SystemName = "system1";
+ String changelog1StreamName = "store1Changelog";
+ SystemStream changelog1SystemStream = new SystemStream(changelog1SystemName, changelog1StreamName);
+ SystemStreamPartition changelog1SSP = new SystemStreamPartition(changelog1SystemStream, taskChangelogPartition);
+ // checkpointed changelog offset is valid
+ SystemStreamPartitionMetadata changelog1SSPMetadata = new SystemStreamPartitionMetadata("4", "20", "21");
+ Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
+
+ String changelog1CheckpointedOffset = "5";
+ CheckpointId checkpointId = CheckpointId.fromString("0-0"); // checkpoint timestamp older than default min compaction lag
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(checkpointId, changelog1CheckpointedOffset);
+ Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
+ new HashMap<SystemStreamPartition, String>() { {
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
+ } };
+ Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
+ ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
+
+ SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
+ SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
+ when(mockSystemAdmins.getSystemAdmin(changelog1SSP.getSystem())).thenReturn(mockSystemAdmin);
+ StorageManagerUtil mockStorageManagerUtil = mock(StorageManagerUtil.class);
+ File mockLoggedStoreBaseDir = mock(File.class);
+ File mockNonLoggedStoreBaseDir = mock(File.class);
+ Config mockConfig = mock(Config.class);
+ Clock mockClock = mock(Clock.class);
+
+ File mockCurrentStoreDir = mock(File.class);
+ File mockStoreNewerCheckpointDir = mock(File.class);
+ File mockStoreOlderCheckpointDir = mock(File.class);
+ String olderCheckpointDirLocalOffset = "3";
+ String newerCheckpointDirLocalOffset = "5";
+ when(mockStorageManagerUtil.getTaskStoreDir(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any()))
+ .thenReturn(mockCurrentStoreDir);
+ when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(eq(mockLoggedStoreBaseDir), eq(store1Name), eq(taskName), any()))
+ .thenReturn(ImmutableList.of(mockStoreNewerCheckpointDir, mockStoreOlderCheckpointDir));
+ when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreNewerCheckpointDir), any(),
+ eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true);
+ when(mockStorageManagerUtil.isLoggedStoreValid(eq(store1Name), eq(mockStoreOlderCheckpointDir), any(),
+ eq(mockStoreChangelogs), eq(mockTaskModel), any(), eq(mockStoreEngines))).thenReturn(true);
+ Set<SystemStreamPartition> mockChangelogSSPs = ImmutableSet.of(changelog1SSP);
+ when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreNewerCheckpointDir), eq(mockChangelogSSPs), eq(false)))
+ .thenReturn(ImmutableMap.of(changelog1SSP, newerCheckpointDirLocalOffset));
+ when(mockStorageManagerUtil.readOffsetFile(eq(mockStoreOlderCheckpointDir), eq(mockChangelogSSPs), eq(false)))
+ .thenReturn(ImmutableMap.of(changelog1SSP, olderCheckpointDirLocalOffset)); // less than checkpointed offset (5)
+
+ Mockito.when(mockSystemAdmin.offsetComparator(anyString(), anyString()))
+ .thenAnswer((Answer<Integer>) invocation -> {
+ String offset1 = (String) invocation.getArguments()[0];
+ String offset2 = (String) invocation.getArguments()[1];
+ return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
+ });
+
+ StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
+ mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
+ mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
+ mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
+
+ // ensure that the current store dir and older checkpoint dir are marked for deletion
+ assertEquals(2, storeActions.storeDirsToDelete.get(store1Name).size());
+ assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockCurrentStoreDir));
+ assertTrue(storeActions.storeDirsToDelete.get(store1Name).contains(mockStoreOlderCheckpointDir));
+ // ensure that newer checkpoint dir is retained
+ assertEquals(1, storeActions.storeDirsToRetain.size());
+ assertEquals(mockStoreNewerCheckpointDir, storeActions.storeDirsToRetain.get(store1Name));
+ // ensure that we mark the store for restore to head (from local checkpoint to current newest)
+ assertEquals("5", storeActions.storesToRestore.get(store1Name).startingOffset);
+ assertEquals("20", storeActions.storesToRestore.get(store1Name).endingOffset);
+ }
+
+ /**
* This can happen if the changelog topic was manually deleted and recreated, and the checkpointed/local changelog
* offset is not valid anymore.
*/
@@ -1431,9 +1624,11 @@ public class TestTransactionalStateTaskRestoreManager {
Map<String, SystemStream> mockStoreChangelogs = ImmutableMap.of(store1Name, changelog1SystemStream);
String changelog1CheckpointedOffset = "21";
+ CheckpointedChangelogOffset changelog1CheckpointMessage =
+ new CheckpointedChangelogOffset(CheckpointId.create(), changelog1CheckpointedOffset);
Map<SystemStreamPartition, String> mockCheckpointedChangelogOffset =
new HashMap<SystemStreamPartition, String>() { {
- put(changelog1SSP, changelog1CheckpointedOffset);
+ put(changelog1SSP, changelog1CheckpointMessage.toString());
} };
Map<SystemStreamPartition, SystemStreamPartitionMetadata> mockCurrentChangelogOffsets =
ImmutableMap.of(changelog1SSP, changelog1SSPMetadata);
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 85cccf8..a54ae72 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -22,7 +22,7 @@ package org.apache.samza.container
import java.util.Collections
import org.apache.samza.{Partition, SamzaException}
-import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointedChangelogOffset, OffsetManager}
import org.apache.samza.config.MapConfig
import org.apache.samza.context.{TaskContext => _, _}
import org.apache.samza.job.model.TaskModel
@@ -216,10 +216,9 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
val changelogOffsets = Map(changelogSSP -> Some("5"))
- val checkpointId = "1234"
when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskStorageManager.flush()).thenReturn(changelogOffsets)
- when(this.taskStorageManager.checkpoint(any[Map[SystemStreamPartition, Option[String]]])).thenReturn(checkpointId)
+ doNothing().when(this.taskStorageManager).checkpoint(any(), any[Map[SystemStreamPartition, Option[String]]])
taskInstance.commit
val mockOrder = inOrder(this.offsetManager, this.collector, this.taskTableManager, this.taskStorageManager)
@@ -238,7 +237,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
mockOrder.verify(this.taskStorageManager).flush()
// Stores checkpoints should be created next with the newest changelog offsets
- mockOrder.verify(this.taskStorageManager).checkpoint(changelogOffsets)
+ mockOrder.verify(this.taskStorageManager).checkpoint(any(), Matchers.eq(changelogOffsets))
// Input checkpoint should be written with the snapshot captured at the beginning of commit and the
// newest changelog offset captured during storage manager flush
@@ -246,10 +245,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
mockOrder.verify(offsetManager).writeCheckpoint(any(), captor.capture)
val cp = captor.getValue
assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION))
- assertEquals("5", cp.getOffsets.get(changelogSSP))
+ assertEquals("5", CheckpointedChangelogOffset.fromString(cp.getOffsets.get(changelogSSP)).getOffset)
// Old checkpointed stores should be cleared
- mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(checkpointId)
+ mockOrder.verify(this.taskStorageManager).removeOldCheckpoints(any())
verify(commitsCounter).inc()
}
@@ -269,7 +268,10 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
verify(offsetManager).writeCheckpoint(any(), captor.capture)
val cp = captor.getValue
assertEquals("4", cp.getOffsets.get(SYSTEM_STREAM_PARTITION))
- assertEquals(null, cp.getOffsets.get(changelogSSP))
+ val message = cp.getOffsets.get(changelogSSP)
+ val checkpointedOffset = CheckpointedChangelogOffset.fromString(message)
+ assertNull(checkpointedOffset.getOffset)
+ assertNotNull(checkpointedOffset.getCheckpointId)
verify(commitsCounter).inc()
}
@@ -320,7 +322,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]())
- when(this.taskStorageManager.checkpoint(any())).thenThrow(new SamzaException("Error creating store checkpoint"))
+ when(this.taskStorageManager.checkpoint(any(), any())).thenThrow(new SamzaException("Error creating store checkpoint"))
try {
taskInstance.commit
@@ -341,8 +343,8 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
val inputOffsets = new Checkpoint(Map(SYSTEM_STREAM_PARTITION -> "4").asJava)
when(this.offsetManager.buildCheckpoint(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskStorageManager.flush()).thenReturn(Map[SystemStreamPartition, Option[String]]())
- when(this.taskStorageManager.checkpoint(any())).thenReturn("id")
- when(this.taskStorageManager.removeOldCheckpoints("id"))
+ doNothing().when(this.taskStorageManager).checkpoint(any(), any())
+ when(this.taskStorageManager.removeOldCheckpoints(any()))
.thenThrow(new SamzaException("Error clearing old checkpoints"))
try {
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
index 8f0fbb3..43872e9 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -171,7 +171,7 @@ public class TestContainerStorageManager {
configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
- configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+ configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config config = new MapConfig(configMap);
Map<String, Serde<Object>> serdes = new HashMap<>();
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
index 69a2379..f2d4972 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTransactionalStateTaskStorageManager.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.Optional;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.system.SystemAdmin;
@@ -49,7 +50,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -272,7 +272,7 @@ public class TestTransactionalStateTaskStorageManager {
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isLoggedStore()).thenReturn(true);
Path mockPath = mock(Path.class);
- when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath));
+ when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
StorageEngine mockPStore = mock(StorageEngine.class);
StoreProperties pStoreProps = mock(StoreProperties.class);
@@ -309,14 +309,14 @@ public class TestTransactionalStateTaskStorageManager {
ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
// invoke checkpoint
- tsm.checkpoint(offsets);
+ tsm.checkpoint(CheckpointId.create(), offsets);
// ensure that checkpoint is never called for non-logged persistent stores since they're
// always cleared on restart.
- verify(mockPStore, never()).checkpoint(anyString());
+ verify(mockPStore, never()).checkpoint(any());
// ensure that checkpoint is never called for in-memory stores since they're not persistent.
- verify(mockIStore, never()).checkpoint(anyString());
- verify(mockLIStore, never()).checkpoint(anyString());
+ verify(mockIStore, never()).checkpoint(any());
+ verify(mockLIStore, never()).checkpoint(any());
verify(tsm).writeChangelogOffsetFiles(checkpointPathsCaptor.capture(), any(), eq(offsets));
Map<String, Path> checkpointPaths = checkpointPathsCaptor.getValue();
assertEquals(1, checkpointPaths.size());
@@ -332,7 +332,7 @@ public class TestTransactionalStateTaskStorageManager {
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isLoggedStore()).thenReturn(true);
- when(mockLPStore.checkpoint(anyString())).thenThrow(new IllegalStateException());
+ when(mockLPStore.checkpoint(any())).thenThrow(new IllegalStateException());
java.util.Map<String, StorageEngine> taskStores =
ImmutableMap.of("loggedPersistentStore", mockLPStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
@@ -343,7 +343,7 @@ public class TestTransactionalStateTaskStorageManager {
ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
// invoke checkpoint
- tsm.checkpoint(offsets);
+ tsm.checkpoint(CheckpointId.create(), offsets);
verify(tsm, never()).writeChangelogOffsetFiles(any(), any(), any());
fail("Should have thrown an exception if error creating store checkpoint");
}
@@ -358,7 +358,7 @@ public class TestTransactionalStateTaskStorageManager {
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isLoggedStore()).thenReturn(true);
Path mockPath = mock(Path.class);
- when(mockLPStore.checkpoint(anyString())).thenReturn(Optional.of(mockPath));
+ when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
java.util.Map<String, StorageEngine> taskStores =
ImmutableMap.of("loggedPersistentStore", mockLPStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
@@ -371,7 +371,7 @@ public class TestTransactionalStateTaskStorageManager {
ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
// invoke checkpoint
- tsm.checkpoint(offsets);
+ tsm.checkpoint(CheckpointId.create(), offsets);
fail("Should have thrown an exception if error writing offset file.");
}
diff --git a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
index d482c15..2d26d29 100644
--- a/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
+++ b/samza-kv-inmemory/src/main/scala/org/apache/samza/storage/kv/inmemory/InMemoryKeyValueStore.scala
@@ -25,6 +25,8 @@ import java.nio.file.Path
import java.util
import java.util.Optional
+import org.apache.samza.checkpoint.CheckpointId
+
/**
* In memory implementation of a key value store.
*
@@ -127,7 +129,7 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
}
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
// No checkpoint being persisted. State restores from Changelog.
Optional.empty()
}
diff --git a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index a2ae8b0..300177a 100644
--- a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -21,15 +21,13 @@ package org.apache.samza.storage.kv
import java.io.File
import java.nio.file.{Path, Paths}
-import java.util
-import java.util.{Comparator, Optional}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.{Comparator, Optional}
-import org.apache.commons.io.FileUtils
-import org.apache.samza.{SamzaException, checkpoint}
+import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.config.Config
-import org.apache.samza.serializers.CheckpointSerde
import org.apache.samza.util.Logging
import org.rocksdb.{TtlDB, _}
@@ -239,9 +237,9 @@ class RocksDbKeyValueStore(
trace("Flushed store: %s" format storeName)
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
val checkpoint = Checkpoint.create(db)
- val checkpointPath = dir.getPath + "-" + id
+ val checkpointPath = dir.getPath + "-" + id.toString
checkpoint.createCheckpoint(checkpointPath)
Optional.of(Paths.get(checkpointPath))
}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
index 7e514e7..177a986 100644
--- a/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
+++ b/samza-kv/src/main/java/org/apache/samza/storage/kv/LargeMessageSafeStore.java
@@ -22,6 +22,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -126,7 +127,7 @@ public class LargeMessageSafeStore implements KeyValueStore<byte[], byte[]> {
}
@Override
- public Optional<Path> checkpoint(String id) {
+ public Optional<Path> checkpoint(CheckpointId id) {
return store.checkpoint(id);
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
index ace7aa5..8c32793 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
@@ -24,6 +24,7 @@ import java.nio.file.Path
import java.util
import java.util.Optional
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.config.StorageConfig
import org.apache.samza.task.MessageCollector
import org.apache.samza.util.Logging
@@ -163,7 +164,7 @@ class AccessLoggedStore[K, V](
bytes
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
store.checkpoint(id)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 41d2d9f..5c1961c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -20,10 +20,13 @@
package org.apache.samza.storage.kv
import org.apache.samza.util.Logging
+
import scala.collection._
import java.nio.file.Path
import java.util.{Arrays, Optional}
+import org.apache.samza.checkpoint.CheckpointId
+
/**
* A write-behind caching layer around the rocksdb store. The purpose of this cache is three-fold:
* 1. Batch together writes to rocksdb, this turns out to be a great optimization
@@ -293,7 +296,7 @@ class CachedStore[K, V](
store.snapshot(from, to)
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
store.checkpoint(id)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 61ff059..bc6778e 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -29,6 +29,8 @@ import org.apache.samza.util.TimerUtil
import java.nio.file.Path
import java.util.Optional
+import org.apache.samza.checkpoint.CheckpointId
+
/**
* A key value store.
*
@@ -208,7 +210,7 @@ class KeyValueStorageEngine[K, V](
}
}
- def checkpoint(id: String): Optional[Path] = {
+ def checkpoint(id: CheckpointId): Optional[Path] = {
updateTimer(metrics.checkpointNs) {
trace("Checkpointing.")
metrics.checkpoints.inc
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
index 4c238bb..320e801 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/LoggedStore.scala
@@ -22,6 +22,7 @@ package org.apache.samza.storage.kv
import java.nio.file.Path
import java.util.Optional
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.util.Logging
import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStreamPartition}
import org.apache.samza.task.MessageCollector
@@ -121,7 +122,7 @@ class LoggedStore[K, V](
store.snapshot(from, to)
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
store.checkpoint(id)
}
}
\ No newline at end of file
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 3bc4674..8bb6fa2 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -22,6 +22,8 @@ package org.apache.samza.storage.kv
import java.nio.file.Path
import java.util.Optional
+import org.apache.samza.checkpoint.CheckpointId
+
import scala.collection.JavaConverters._
object NullSafeKeyValueStore {
@@ -99,7 +101,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
store.snapshot(from, to)
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
store.checkpoint(id)
}
}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index 169452c..5b3456c 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -21,6 +21,8 @@ package org.apache.samza.storage.kv
import java.nio.file.Path
import java.util.Optional
+
+import org.apache.samza.checkpoint.CheckpointId
import org.apache.samza.util.Logging
import org.apache.samza.serializers._
@@ -166,7 +168,7 @@ class SerializedKeyValueStore[K, V](
}
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
store.checkpoint(id)
}
}
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index c20c2c5..c0fc080 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -24,6 +24,8 @@ import java.util
import java.nio.file.Path
import java.util.Optional
+import org.apache.samza.checkpoint.CheckpointId
+
/**
* A mock key-value store wrapper that handles serialization
*/
@@ -76,7 +78,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
throw new UnsupportedOperationException("iterator() not supported")
}
- override def checkpoint(id: String): Optional[Path] = {
+ override def checkpoint(id: CheckpointId): Optional[Path] = {
Optional.empty()
}
}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
index 67a4de8..05e6737 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateIntegrationTest.java
@@ -89,7 +89,7 @@ public class TransactionalStateIntegrationTest extends StreamApplicationIntegrat
put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
- put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+ put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
} };
diff --git a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
index 0c678b6..41eb1ab 100644
--- a/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/storage/kv/TransactionalStateMultiStoreIntegrationTest.java
@@ -88,7 +88,7 @@ public class TransactionalStateMultiStoreIntegrationTest extends StreamApplicati
put(TaskConfig.CHECKPOINT_MANAGER_FACTORY, "org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory");
put(TaskConfig.COMMIT_MS, "-1"); // manual commit only
put(TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED, "true");
- put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_CHANGELOG_STATE, "true");
+ put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
put(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR(), "1");
put(JobConfig.JOB_LOGGED_STORE_BASE_DIR, LOGGED_STORE_BASE_DIR);
} };