You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/02/13 21:36:22 UTC
[samza] branch master updated: Consolidating offset read and write
for store-offsets and side-inputs, maintaining backward compatbility
This is an automated email from the ASF dual-hosted git repository.
atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 551192f Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
551192f is described below
commit 551192f562ea067072bb5a56adc5a1a63189b73b
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Wed Feb 13 13:36:12 2019 -0800
Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
This consolidates the two different kinds of offsets used by stores and side-inputs into one.
The code can still read and make sense of store-offset files written in the old format.
The new format stores a map of ssp to offset, rather than a singular string.
Updated tests.
And added tests to ensure the reading of the old-format offset still works.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: mynameborat
Closes #915 from rmatharu/test-offset and squashes the following commits:
c20e94c4 [Ray Matharu] minor
403f72b1 [Ray Matharu] simplifying code
b5868066 [Ray Matharu] minor
ca648223 [Ray Matharu] gradle.props
b50e5fe1 [Ray Matharu] Applying comments
75905b0c [Ray Matharu] minor
a8d1dcdc [Ray Matharu] Addressing review comments
1f7ecdcc [Ray Matharu] minor
666157ad [Ray Matharu] test fix
d621a51c [Ray Matharu] Adding test for read of old format
b9e58997 [Ray Matharu] build fix
9cb822ca [Ray Matharu] bug fix
5484e430 [Ray Matharu] Consolidating writing of offsets
55f35b7b [Ray Matharu] minor
88963a8c [Ray Matharu] minor
f14ee03b [Ray Matharu] Consolidating read of offset file
12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
2c09b081 [Ray Matharu] Rocksdb bug fix
---
.../apache/samza/storage/StorageManagerUtil.java | 78 ++++++++++++++++++----
.../samza/storage/TaskSideInputStorageManager.java | 27 +++-----
.../samza/storage/ContainerStorageManager.java | 29 ++++----
.../apache/samza/storage/TaskStorageManager.scala | 10 +--
.../samza/storage/TestTaskStorageManager.scala | 53 +++++++++------
.../apache/samza/monitor/LocalStoreMonitor.java | 2 +-
.../samza/monitor/TestLocalStoreMonitor.java | 3 +-
7 files changed, 127 insertions(+), 75 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 1f0749c..79e67e1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -21,16 +21,34 @@ package org.apache.samza.storage;
import com.google.common.collect.ImmutableMap;
import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.stream.Collectors;
import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.FileUtil;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StorageManagerUtil {
private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class);
+ public static final String OFFSET_FILE_NAME = "OFFSET";
+ private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
+ private static final TypeReference<Map<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
+ new TypeReference<Map<SystemStreamPartition, String>>() { };
+ private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
+
/**
* Fetch the starting offset for the input {@link SystemStreamPartition}
@@ -70,17 +88,15 @@ public class StorageManagerUtil {
* the {@code storeDeleteRetentionInMs}, then the store is considered stale.
*
* @param storeDir the base directory of the store
- * @param offsetFileName the offset file name
* @param storeDeleteRetentionInMs store delete retention in millis
* @param currentTimeMs current time in ms
* @return true if the store is stale, false otherwise
*/
- public static boolean isStaleStore(
- File storeDir, String offsetFileName, long storeDeleteRetentionInMs, long currentTimeMs) {
+ public static boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs) {
boolean isStaleStore = false;
String storePath = storeDir.toPath().toString();
if (storeDir.exists()) {
- File offsetFileRef = new File(storeDir, offsetFileName);
+ File offsetFileRef = new File(storeDir, OFFSET_FILE_NAME);
long offsetFileLastModifiedTime = offsetFileRef.lastModified();
if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) {
LOG.info(
@@ -98,14 +114,14 @@ public class StorageManagerUtil {
* An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty.
*
* @param storeDir the base directory of the store
- * @param offsetFileName name of the offset file
+ * @param storeSSPs storeSSPs (if any) associated with the store
* @return true if the offset file is valid. false otherwise.
*/
- public static boolean isOffsetFileValid(File storeDir, String offsetFileName) {
+ public static boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs) {
boolean hasValidOffsetFile = false;
if (storeDir.exists()) {
- String offsetContents = readOffsetFile(storeDir, offsetFileName);
- if (offsetContents != null && !offsetContents.isEmpty()) {
+ Map<SystemStreamPartition, String> offsetContents = readOffsetFile(storeDir, storeSSPs);
+ if (offsetContents != null && !offsetContents.isEmpty() && offsetContents.keySet().equals(storeSSPs)) {
hasValidOffsetFile = true;
} else {
LOG.info("Offset file is not valid for store: {}.", storeDir.toPath());
@@ -116,6 +132,34 @@ public class StorageManagerUtil {
}
/**
+ * Write the given SSP-Offset map into the offsets file.
+ * @param storeBaseDir the base directory of the store
+ * @param storeName the store name to use
+ * @param taskName the task name which is referencing the store
+ * @param offsets The SSP-offset to write
+ * @throws IOException because of deserializing to json
+ */
+ public static void writeOffsetFile(File storeBaseDir, String storeName, TaskName taskName,
+ Map<SystemStreamPartition, String> offsets) throws IOException {
+ File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName), OFFSET_FILE_NAME);
+ String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
+ FileUtil.writeWithChecksum(offsetFile, fileContents);
+ }
+
+ /**
+ * Delete the offset file for this task and store, if one exists.
+ * @param storeBaseDir the base directory of the store
+ * @param storeName the store name to use
+ * @param taskName the task name which is referencing the store
+ */
+ public static void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName) {
+ File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName), OFFSET_FILE_NAME);
+ if (offsetFile.exists()) {
+ FileUtil.rm(offsetFile);
+ }
+ }
+
+ /**
* Check if a store's disk footprint exists.
*
* @param storeDir the base directory of the store
@@ -129,18 +173,24 @@ public class StorageManagerUtil {
* Read and return the contents of the offset file.
*
* @param storagePartitionDir the base directory of the store
- * @param offsetFileName name of the offset file
+ * @param storeSSPs SSPs associated with the store (if any)
* @return the content of the offset file if it exists for the store, null otherwise.
*/
- public static String readOffsetFile(File storagePartitionDir, String offsetFileName) {
- String offset = null;
- File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+ public static Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, Set<SystemStreamPartition> storeSSPs) {
+ Map<SystemStreamPartition, String> offsets = new HashMap<>();
+ String fileContents = null;
+ File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
String storePath = storagePartitionDir.getPath();
if (offsetFileRef.exists()) {
LOG.info("Found offset file in storage partition directory: {}", storePath);
try {
- offset = FileUtil.readWithChecksum(offsetFileRef);
+ fileContents = FileUtil.readWithChecksum(offsetFileRef);
+ offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
+ } catch (JsonParseException | JsonMappingException e) {
+ LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+ final String finalFileContents = fileContents;
+ offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(ssp -> ssp, offset -> finalFileContents)) : offsets;
} catch (Exception e) {
LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e);
}
@@ -148,7 +198,7 @@ public class StorageManagerUtil {
LOG.info("No offset file found in storage partition directory: {}", storePath);
}
- return offset;
+ return offsets;
}
/**
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 3982623..2b5717a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -39,7 +39,6 @@ import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.container.TaskName;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -51,9 +50,6 @@ import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.FileUtil;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
@@ -65,17 +61,12 @@ import scala.collection.JavaConverters;
*/
public class TaskSideInputStorageManager {
private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
- private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
- private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
- private static final TypeReference<HashMap<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
- new TypeReference<HashMap<SystemStreamPartition, String>>() { };
- private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
private final Clock clock;
private final Map<String, SideInputsProcessor> storeToProcessor;
private final Map<String, StorageEngine> stores;
- private final String storeBaseDir;
+ private final File storeBaseDir;
private final Map<String, Set<SystemStreamPartition>> storeToSSps;
private final Map<SystemStreamPartition, Set<String>> sspsToStores;
private final StreamMetadataCache streamMetadataCache;
@@ -97,7 +88,7 @@ public class TaskSideInputStorageManager {
Clock clock) {
this.clock = clock;
this.stores = sideInputStores;
- this.storeBaseDir = storeBaseDir;
+ this.storeBaseDir = new File(storeBaseDir);
this.storeToSSps = storesToSSPs;
this.streamMetadataCache = streamMetadataCache;
this.systemAdmins = systemAdmins;
@@ -258,9 +249,7 @@ public class TaskSideInputStorageManager {
.collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
try {
- String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
- File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE);
- FileUtil.writeWithChecksum(offsetFile, fileContents);
+ StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, taskName, offsets);
} catch (Exception e) {
throw new SamzaException("Failed to write offset file for side input store: " + storeName, e);
}
@@ -284,8 +273,8 @@ public class TaskSideInputStorageManager {
File storeLocation = getStoreLocation(storeName);
if (isValidSideInputStore(storeName, storeLocation)) {
try {
- String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE);
- Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
+
+ Map<SystemStreamPartition, String> offsets = StorageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName));
fileOffsets.putAll(offsets);
} catch (Exception e) {
LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
@@ -298,7 +287,7 @@ public class TaskSideInputStorageManager {
@VisibleForTesting
File getStoreLocation(String storeName) {
- return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
+ return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName, taskName);
}
/**
@@ -368,8 +357,8 @@ public class TaskSideInputStorageManager {
private boolean isValidSideInputStore(String storeName, File storeLocation) {
return isPersistedStore(storeName)
- && !StorageManagerUtil.isStaleStore(storeLocation, OFFSET_FILE, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
- && StorageManagerUtil.isOffsetFileValid(storeLocation, OFFSET_FILE);
+ && !StorageManagerUtil.isStaleStore(storeLocation, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
+ && StorageManagerUtil.isOffsetFileValid(storeLocation, storeToSSps.get(storeName));
}
private boolean isPersistedStore(String storeName) {
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index b896267..3b86a4e 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -466,8 +467,6 @@ public class ContainerStorageManager {
* with the respective consumer, restoring stores, and stopping stores.
*/
private class TaskRestoreManager {
-
- private final static String OFFSET_FILE_NAME = "OFFSET";
private final Map<String, StorageEngine> taskStores; // Map of all StorageEngines for this task indexed by store name
private final Set<String> taskStoresToRestore;
// Set of store names which need to be restored by consuming using system-consumers (see registerStartingOffsets)
@@ -532,14 +531,14 @@ public class ContainerStorageManager {
LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
FileUtil.rm(loggedStorePartitionDir);
} else {
- String offset = StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, OFFSET_FILE_NAME);
- LOG.info("Read offset " + offset + " for the store " + storeName + " from logged storage partition directory "
- + loggedStorePartitionDir);
-
- if (offset != null) {
- fileOffsets.put(
- new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()),
- offset);
+
+ SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+ Map<SystemStreamPartition, String> offset =
+ StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP));
+ LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
+
+ if (offset.containsKey(changelogSSP)) {
+ fileOffsets.put(changelogSSP, offset.get(changelogSSP));
}
}
});
@@ -562,9 +561,13 @@ public class ContainerStorageManager {
(long) new StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get();
}
- return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk()
- && StorageManagerUtil.isOffsetFileValid(loggedStoreDir, OFFSET_FILE_NAME) && !StorageManagerUtil.isStaleStore(
- loggedStoreDir, OFFSET_FILE_NAME, changeLogDeleteRetentionInMs, clock.currentTimeMillis());
+ if (changelogSystemStreams.containsKey(storeName)) {
+ SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+ return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() && StorageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP))
+ && !StorageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis());
+ }
+
+ return false;
}
/**
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index f2c4679..85cbf53 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -45,8 +45,6 @@ class TaskStorageManager(
case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk
}
- val offsetFileName = "OFFSET"
-
def getStore(storeName: String): Option[StorageEngine] = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
def init {
@@ -90,17 +88,13 @@ class TaskStorageManager(
val newestOffset = if (sspMetadata == null) null else sspMetadata.getNewestOffset
debug("Got offset %s for store %s" format(newestOffset, storeName))
- val loggedStorePartitionDir = StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
- val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
if (newestOffset != null) {
debug("Storing offset for store in OFFSET file ")
- FileUtil.writeWithChecksum(offsetFile, newestOffset)
+ StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, taskName, Map(ssp -> newestOffset).asJava)
debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, storeName))
} else {
//if newestOffset is null, then it means the store is (or has become) empty. No need to persist the offset file
- if (offsetFile.exists()) {
- FileUtil.rm(offsetFile)
- }
+ StorageManagerUtil.deleteOffsetFile(loggedStoreBaseDir, storeName, taskName);
debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
}
} catch {
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index a7a9c8b..8e22533 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -22,7 +22,6 @@ package org.apache.samza.storage
import java.io.{File, FileOutputStream, ObjectOutputStream}
import java.util
-import java.util.Optional
import org.apache.samza.Partition
import org.apache.samza.config._
@@ -81,7 +80,7 @@ class TestTaskStorageManager extends MockitoSugar {
val ssp = new SystemStreamPartition(ss, partition)
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
val storeFile = new File(storeDirectory, "store.sst")
- val offsetFile = new File(storeDirectory, "OFFSET")
+ val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
@@ -122,14 +121,14 @@ class TestTaskStorageManager extends MockitoSugar {
// Test 2: flush should update the offset file
taskManager.flush()
assertTrue(offsetFile.exists())
- assertEquals("50", FileUtil.readWithChecksum(offsetFile))
+ assertEquals("{\"kafka.testStream.0\":\"50\"}", FileUtil.readWithChecksum(offsetFile))
// Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("0", "100", "101"))
taskManager.stop()
assertTrue(storeFile.exists())
assertTrue(offsetFile.exists())
- assertEquals("100", FileUtil.readWithChecksum(offsetFile))
+ assertEquals("{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
// Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
@@ -260,7 +259,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -278,7 +277,7 @@ class TestTaskStorageManager extends MockitoSugar {
// is older than deletionRetention of the changeLog.
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
storeDirectory.setLastModified(0)
- val offsetFile = new File(storeDirectory, "OFFSET")
+ val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
offsetFile.createNewFile()
FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
offsetFile.setLastModified(0)
@@ -295,7 +294,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -312,7 +311,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
- val offsetFile = new File(storeDirectory, "OFFSET")
+ val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -342,7 +341,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFile.exists())
- assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFile))
+ assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
}
/**
@@ -352,10 +351,10 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushCreatesOffsetFileForLoggedStore() {
val partition = new Partition(0)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
val anotherOffsetPath = new File(
StorageManagerUtil.getStorePartitionDir(
- TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) + File.separator + "OFFSET")
+ TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -386,7 +385,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
}
@@ -398,7 +397,7 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
val partition = new Partition(0)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
@@ -431,7 +430,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
//Invoke test method again
taskStorageManager.flush()
@@ -445,7 +444,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)
val ssp = new SystemStreamPartition("kafka", "testStream", partition)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val sspMetadataCache = mock[SSPMetadataCache]
@@ -475,7 +474,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "139", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
// Flush again
when(sspMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("20", "193", "194"))
@@ -485,14 +484,14 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "193", FileUtil.readWithChecksum(offsetFilePath))
+ assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
}
@Test
def testStopShouldNotCreateOffsetFileForEmptyStore() {
val partition = new Partition(0)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
val sspMetadataCache = mock[SSPMetadataCache]
@@ -567,6 +566,22 @@ class TestTaskStorageManager extends MockitoSugar {
testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
}
+ @Test
+ def testReadOfOldOffsetFormat(): Unit = {
+ // Create a file in old single-offset format, with a sample offset
+ val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+ val storeFile = new File(storeDirectory, "store.sst")
+ val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
+ val sampleOldOffset = "912321"
+ FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
+
+
+ // read offset against a given ssp from the file
+ var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
+ val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava)
+ assertTrue(offsets.get(ssp).equals(sampleOldOffset))
+ }
+
private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
val systemName = "kafka"
val streamName = "testStream"
@@ -579,7 +594,7 @@ class TestTaskStorageManager extends MockitoSugar {
val storeFile = new File(storeDirectory, "store.sst")
if (writeOffsetFile) {
- val offsetFile = new File(storeDirectory, "OFFSET")
+ val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
if (fileOffset != null) {
FileUtil.writeWithChecksum(offsetFile, fileOffset)
} else {
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 5ac8f9a..9e1daaf 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -51,7 +51,7 @@ public class LocalStoreMonitor implements Monitor {
private static final Logger LOG = LoggerFactory.getLogger(LocalStoreMonitor.class);
- private static final String OFFSET_FILE_NAME = "OFFSET";
+ private static final String OFFSET_FILE_NAME = StorageManagerUtil.OFFSET_FILE_NAME;
private final JobsClient jobsClient;
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index e669ab9..f297a11 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -31,6 +31,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.Assert;
@@ -195,7 +196,7 @@ public class TestLocalStoreMonitor {
}
private static File createOffsetFile(File taskStoreDir) throws Exception {
- File offsetFile = new File(taskStoreDir, "OFFSET");
+ File offsetFile = new File(taskStoreDir, StorageManagerUtil.OFFSET_FILE_NAME);
offsetFile.createNewFile();
return offsetFile;
}