You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by at...@apache.org on 2019/02/13 21:36:22 UTC

[samza] branch master updated: Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility

This is an automated email from the ASF dual-hosted git repository.

atoomula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 551192f  Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
551192f is described below

commit 551192f562ea067072bb5a56adc5a1a63189b73b
Author: Ray Matharu <rm...@linkedin.com>
AuthorDate: Wed Feb 13 13:36:12 2019 -0800

    Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
    
    This consolidates the two different kinds of offsets used by stores and side-inputs into one.
    The code can still read and make sense of store-offset files written in the old format.
    The new format stores a map of ssp to offset, rather than a singular string.
    
    Updated tests.
    And added tests to ensure the reading of the old-format offset still works.
    
    Author: Ray Matharu <rm...@linkedin.com>
    
    Reviewers: mynameborat
    
    Closes #915 from rmatharu/test-offset and squashes the following commits:
    
    c20e94c4 [Ray Matharu] minor
    403f72b1 [Ray Matharu] simplifying code
    b5868066 [Ray Matharu] minor
    ca648223 [Ray Matharu] gradle.props
    b50e5fe1 [Ray Matharu] Applying comments
    75905b0c [Ray Matharu] minor
    a8d1dcdc [Ray Matharu] Addressing review comments
    1f7ecdcc [Ray Matharu] minor
    666157ad [Ray Matharu] test fix
    d621a51c [Ray Matharu] Adding test for read of old format
    b9e58997 [Ray Matharu] build fix
    9cb822ca [Ray Matharu] bug fix
    5484e430 [Ray Matharu] Consolidating writing of offsets
    55f35b7b [Ray Matharu] minor
    88963a8c [Ray Matharu] minor
    f14ee03b [Ray Matharu] Consolidating read of offset file
    12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza
    2c09b081 [Ray Matharu] Rocksdb bug fix
---
 .../apache/samza/storage/StorageManagerUtil.java   | 78 ++++++++++++++++++----
 .../samza/storage/TaskSideInputStorageManager.java | 27 +++-----
 .../samza/storage/ContainerStorageManager.java     | 29 ++++----
 .../apache/samza/storage/TaskStorageManager.scala  | 10 +--
 .../samza/storage/TestTaskStorageManager.scala     | 53 +++++++++------
 .../apache/samza/monitor/LocalStoreMonitor.java    |  2 +-
 .../samza/monitor/TestLocalStoreMonitor.java       |  3 +-
 7 files changed, 127 insertions(+), 75 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
index 1f0749c..79e67e1 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
@@ -21,16 +21,34 @@ package org.apache.samza.storage;
 
 import com.google.common.collect.ImmutableMap;
 import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import java.util.stream.Collectors;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.FileUtil;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 public class StorageManagerUtil {
   private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class);
+  public static final String OFFSET_FILE_NAME = "OFFSET";
+  private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
+  private static final TypeReference<Map<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
+            new TypeReference<Map<SystemStreamPartition, String>>() { };
+  private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
+
 
   /**
    * Fetch the starting offset for the input {@link SystemStreamPartition}
@@ -70,17 +88,15 @@ public class StorageManagerUtil {
    * the {@code storeDeleteRetentionInMs}, then the store is considered stale.
    *
    * @param storeDir the base directory of the store
-   * @param offsetFileName the offset file name
    * @param storeDeleteRetentionInMs store delete retention in millis
    * @param currentTimeMs current time in ms
    * @return true if the store is stale, false otherwise
    */
-  public static boolean isStaleStore(
-      File storeDir, String offsetFileName, long storeDeleteRetentionInMs, long currentTimeMs) {
+  public static boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs) {
     boolean isStaleStore = false;
     String storePath = storeDir.toPath().toString();
     if (storeDir.exists()) {
-      File offsetFileRef = new File(storeDir, offsetFileName);
+      File offsetFileRef = new File(storeDir, OFFSET_FILE_NAME);
       long offsetFileLastModifiedTime = offsetFileRef.lastModified();
       if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) {
         LOG.info(
@@ -98,14 +114,14 @@ public class StorageManagerUtil {
    * An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty.
    *
    * @param storeDir the base directory of the store
-   * @param offsetFileName name of the offset file
+   * @param storeSSPs storeSSPs (if any) associated with the store
    * @return true if the offset file is valid. false otherwise.
    */
-  public static boolean isOffsetFileValid(File storeDir, String offsetFileName) {
+  public static boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs) {
     boolean hasValidOffsetFile = false;
     if (storeDir.exists()) {
-      String offsetContents = readOffsetFile(storeDir, offsetFileName);
-      if (offsetContents != null && !offsetContents.isEmpty()) {
+      Map<SystemStreamPartition, String> offsetContents = readOffsetFile(storeDir, storeSSPs);
+      if (offsetContents != null && !offsetContents.isEmpty() && offsetContents.keySet().equals(storeSSPs)) {
         hasValidOffsetFile = true;
       } else {
         LOG.info("Offset file is not valid for store: {}.", storeDir.toPath());
@@ -116,6 +132,34 @@ public class StorageManagerUtil {
   }
 
   /**
+   * Write the given SSP-Offset map into the offsets file.
+   * @param storeBaseDir the base directory of the store
+   * @param storeName the store name to use
+   * @param taskName the task name which is referencing the store
+   * @param offsets The SSP-offset to write
+   * @throws IOException because of deserializing to json
+   */
+  public static void writeOffsetFile(File storeBaseDir, String storeName, TaskName taskName,
+      Map<SystemStreamPartition, String> offsets) throws IOException {
+    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName), OFFSET_FILE_NAME);
+    String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
+    FileUtil.writeWithChecksum(offsetFile, fileContents);
+  }
+
+  /**
+   *  Delete the offset file for this task and store, if one exists.
+   * @param storeBaseDir the base directory of the store
+   * @param storeName the store name to use
+   * @param taskName the task name which is referencing the store
+   */
+  public static void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName) {
+    File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName), OFFSET_FILE_NAME);
+    if (offsetFile.exists()) {
+      FileUtil.rm(offsetFile);
+    }
+  }
+
+  /**
    * Check if a store's disk footprint exists.
    *
    * @param storeDir the base directory of the store
@@ -129,18 +173,24 @@ public class StorageManagerUtil {
    * Read and return the contents of the offset file.
    *
    * @param storagePartitionDir the base directory of the store
-   * @param offsetFileName name of the offset file
+   * @param storeSSPs SSPs associated with the store (if any)
    * @return the content of the offset file if it exists for the store, null otherwise.
    */
-  public static String readOffsetFile(File storagePartitionDir, String offsetFileName) {
-    String offset = null;
-    File offsetFileRef = new File(storagePartitionDir, offsetFileName);
+  public static Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, Set<SystemStreamPartition> storeSSPs) {
+    Map<SystemStreamPartition, String> offsets = new HashMap<>();
+    String fileContents = null;
+    File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
     String storePath = storagePartitionDir.getPath();
 
     if (offsetFileRef.exists()) {
       LOG.info("Found offset file in storage partition directory: {}", storePath);
       try {
-        offset = FileUtil.readWithChecksum(offsetFileRef);
+        fileContents = FileUtil.readWithChecksum(offsetFileRef);
+        offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
+      } catch (JsonParseException | JsonMappingException e) {
+        LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
+        final String finalFileContents = fileContents;
+        offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(ssp -> ssp, offset -> finalFileContents)) : offsets;
       } catch (Exception e) {
         LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e);
       }
@@ -148,7 +198,7 @@ public class StorageManagerUtil {
       LOG.info("No offset file found in storage partition directory: {}", storePath);
     }
 
-    return offset;
+    return offsets;
   }
 
   /**
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
index 3982623..2b5717a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
@@ -39,7 +39,6 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -51,9 +50,6 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.FileUtil;
 
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
@@ -65,17 +61,12 @@ import scala.collection.JavaConverters;
  */
 public class TaskSideInputStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
-  private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
   private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
-  private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
-  private static final TypeReference<HashMap<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
-      new TypeReference<HashMap<SystemStreamPartition, String>>() { };
-  private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
 
   private final Clock clock;
   private final Map<String, SideInputsProcessor> storeToProcessor;
   private final Map<String, StorageEngine> stores;
-  private final String storeBaseDir;
+  private final File storeBaseDir;
   private final Map<String, Set<SystemStreamPartition>> storeToSSps;
   private final Map<SystemStreamPartition, Set<String>> sspsToStores;
   private final StreamMetadataCache streamMetadataCache;
@@ -97,7 +88,7 @@ public class TaskSideInputStorageManager {
       Clock clock) {
     this.clock = clock;
     this.stores = sideInputStores;
-    this.storeBaseDir = storeBaseDir;
+    this.storeBaseDir = new File(storeBaseDir);
     this.storeToSSps = storesToSSPs;
     this.streamMetadataCache = streamMetadataCache;
     this.systemAdmins = systemAdmins;
@@ -258,9 +249,7 @@ public class TaskSideInputStorageManager {
               .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
 
             try {
-              String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
-              File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE);
-              FileUtil.writeWithChecksum(offsetFile, fileContents);
+              StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, taskName, offsets);
             } catch (Exception e) {
               throw new SamzaException("Failed to write offset file for side input store: " + storeName, e);
             }
@@ -284,8 +273,8 @@ public class TaskSideInputStorageManager {
         File storeLocation = getStoreLocation(storeName);
         if (isValidSideInputStore(storeName, storeLocation)) {
           try {
-            String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE);
-            Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
+
+            Map<SystemStreamPartition, String> offsets = StorageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName));
             fileOffsets.putAll(offsets);
           } catch (Exception e) {
             LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
@@ -298,7 +287,7 @@ public class TaskSideInputStorageManager {
 
   @VisibleForTesting
   File getStoreLocation(String storeName) {
-    return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
+    return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName, taskName);
   }
 
   /**
@@ -368,8 +357,8 @@ public class TaskSideInputStorageManager {
 
   private boolean isValidSideInputStore(String storeName, File storeLocation) {
     return isPersistedStore(storeName)
-        && !StorageManagerUtil.isStaleStore(storeLocation, OFFSET_FILE, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
-        && StorageManagerUtil.isOffsetFileValid(storeLocation, OFFSET_FILE);
+        && !StorageManagerUtil.isStaleStore(storeLocation, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
+        && StorageManagerUtil.isOffsetFileValid(storeLocation, storeToSSps.get(storeName));
   }
 
   private boolean isPersistedStore(String storeName) {
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
index b896267..3b86a4e 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -23,6 +23,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.File;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -466,8 +467,6 @@ public class ContainerStorageManager {
    * with the respective consumer, restoring stores, and stopping stores.
    */
   private class TaskRestoreManager {
-
-    private final static String OFFSET_FILE_NAME = "OFFSET";
     private final Map<String, StorageEngine> taskStores; // Map of all StorageEngines for this task indexed by store name
     private final Set<String> taskStoresToRestore;
     // Set of store names which need to be restored by consuming using system-consumers (see registerStartingOffsets)
@@ -532,14 +531,14 @@ public class ContainerStorageManager {
             LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
             FileUtil.rm(loggedStorePartitionDir);
           } else {
-            String offset = StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, OFFSET_FILE_NAME);
-            LOG.info("Read offset " + offset + " for the store " + storeName + " from logged storage partition directory "
-                + loggedStorePartitionDir);
-
-            if (offset != null) {
-              fileOffsets.put(
-                  new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()),
-                  offset);
+
+            SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+            Map<SystemStreamPartition, String> offset =
+                StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP));
+            LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
+
+            if (offset.containsKey(changelogSSP)) {
+              fileOffsets.put(changelogSSP, offset.get(changelogSSP));
             }
           }
         });
@@ -562,9 +561,13 @@ public class ContainerStorageManager {
             (long) new StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get();
       }
 
-      return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk()
-          && StorageManagerUtil.isOffsetFileValid(loggedStoreDir, OFFSET_FILE_NAME) && !StorageManagerUtil.isStaleStore(
-          loggedStoreDir, OFFSET_FILE_NAME, changeLogDeleteRetentionInMs, clock.currentTimeMillis());
+      if (changelogSystemStreams.containsKey(storeName)) {
+        SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+        return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() && StorageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP))
+            && !StorageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis());
+      }
+
+      return false;
     }
 
     /**
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index f2c4679..85cbf53 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -45,8 +45,6 @@ class TaskStorageManager(
     case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk
   }
 
-  val offsetFileName = "OFFSET"
-
   def getStore(storeName: String): Option[StorageEngine] =  JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
 
   def init {
@@ -90,17 +88,13 @@ class TaskStorageManager(
         val newestOffset = if (sspMetadata == null) null else sspMetadata.getNewestOffset
         debug("Got offset %s for store %s" format(newestOffset, storeName))
 
-        val loggedStorePartitionDir = StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
-        val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
         if (newestOffset != null) {
           debug("Storing offset for store in OFFSET file ")
-          FileUtil.writeWithChecksum(offsetFile, newestOffset)
+          StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, taskName, Map(ssp -> newestOffset).asJava)
           debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, storeName))
         } else {
           //if newestOffset is null, then it means the store is (or has become) empty. No need to persist the offset file
-          if (offsetFile.exists()) {
-            FileUtil.rm(offsetFile)
-          }
+          StorageManagerUtil.deleteOffsetFile(loggedStoreBaseDir, storeName, taskName);
           debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
         }
       } catch {
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index a7a9c8b..8e22533 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -22,7 +22,6 @@ package org.apache.samza.storage
 
 import java.io.{File, FileOutputStream, ObjectOutputStream}
 import java.util
-import java.util.Optional
 
 import org.apache.samza.Partition
 import org.apache.samza.config._
@@ -81,7 +80,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val ssp = new SystemStreamPartition(ss, partition)
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
     val storeFile = new File(storeDirectory, "store.sst")
-    val offsetFile = new File(storeDirectory, "OFFSET")
+    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
 
     val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
 
@@ -122,14 +121,14 @@ class TestTaskStorageManager extends MockitoSugar {
     // Test 2: flush should update the offset file
     taskManager.flush()
     assertTrue(offsetFile.exists())
-    assertEquals("50", FileUtil.readWithChecksum(offsetFile))
+    assertEquals("{\"kafka.testStream.0\":\"50\"}", FileUtil.readWithChecksum(offsetFile))
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
     when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("0", "100", "101"))
     taskManager.stop()
     assertTrue(storeFile.exists())
     assertTrue(offsetFile.exists())
-    assertEquals("100", FileUtil.readWithChecksum(offsetFile))
+    assertEquals("{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
     sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
@@ -260,7 +259,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -278,7 +277,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // is older than deletionRetention of the changeLog.
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
     storeDirectory.setLastModified(0)
-    val offsetFile = new File(storeDirectory, "OFFSET")
+    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
     offsetFile.createNewFile()
     FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
@@ -295,7 +294,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), StorageManagerUtil.OFFSET_FILE_NAME)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -312,7 +311,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
 
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
-    val offsetFile = new File(storeDirectory, "OFFSET")
+    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -342,7 +341,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFile.exists())
-    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFile))
+    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
   }
 
   /**
@@ -352,10 +351,10 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
     val anotherOffsetPath = new File(
       StorageManagerUtil.getStorePartitionDir(
-        TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) + File.separator + "OFFSET")
+        TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -386,7 +385,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
 
     assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
   }
@@ -398,7 +397,7 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
@@ -431,7 +430,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
 
     //Invoke test method again
     taskStorageManager.flush()
@@ -445,7 +444,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition("kafka", "testStream", partition)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -475,7 +474,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "139", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
 
     // Flush again
     when(sspMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("20", "193", "194"))
@@ -485,14 +484,14 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "193", FileUtil.readWithChecksum(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
   }
 
   @Test
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME)
 
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -567,6 +566,22 @@ class TestTaskStorageManager extends MockitoSugar {
     testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
   }
 
+  @Test
+  def testReadOfOldOffsetFormat(): Unit = {
+    // Create a file in old single-offset format, with a sample offset
+    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+    val storeFile = new File(storeDirectory, "store.sst")
+    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
+    val sampleOldOffset = "912321"
+    FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
+
+
+    // read offset against a given ssp from the file
+    var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
+    val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava)
+    assertTrue(offsets.get(ssp).equals(sampleOldOffset))
+  }
+
   private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
     val systemName = "kafka"
     val streamName = "testStream"
@@ -579,7 +594,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val storeFile = new File(storeDirectory, "store.sst")
 
     if (writeOffsetFile) {
-      val offsetFile = new File(storeDirectory, "OFFSET")
+      val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME)
       if (fileOffset != null) {
         FileUtil.writeWithChecksum(offsetFile, fileOffset)
       } else {
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
index 5ac8f9a..9e1daaf 100644
--- a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
+++ b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
@@ -51,7 +51,7 @@ public class LocalStoreMonitor implements Monitor {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalStoreMonitor.class);
 
-  private static final String OFFSET_FILE_NAME = "OFFSET";
+  private static final String OFFSET_FILE_NAME = StorageManagerUtil.OFFSET_FILE_NAME;
 
   private final JobsClient jobsClient;
 
diff --git a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
index e669ab9..f297a11 100644
--- a/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
+++ b/samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java
@@ -31,6 +31,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.job.JobInstance;
+import org.apache.samza.storage.StorageManagerUtil;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
 import org.junit.Assert;
@@ -195,7 +196,7 @@ public class TestLocalStoreMonitor {
   }
 
   private static File createOffsetFile(File taskStoreDir) throws Exception {
-    File offsetFile = new File(taskStoreDir, "OFFSET");
+    File offsetFile = new File(taskStoreDir, StorageManagerUtil.OFFSET_FILE_NAME);
     offsetFile.createNewFile();
     return offsetFile;
   }