You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/05/02 22:24:27 UTC

[samza] branch master updated: Adding tests for legacy store offset file (#1019)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new fca8437  Adding tests for legacy store offset file (#1019)
fca8437 is described below

commit fca84373f9d45d3409e1996f32d1070c08129813
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Thu May 2 15:24:22 2019 -0700

    Adding tests for legacy store offset file (#1019)
    
    * Adding parameterized tests for legacy-offset file
    
    * Adding test for checking precedence between offset files
---
 .../samza/storage/TestTaskStorageManager.scala     | 103 ++++++++++++++++-----
 1 file changed, 82 insertions(+), 21 deletions(-)

diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 3a4b2b5..8188851 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -35,6 +35,9 @@ import org.apache.samza.system._
 import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.{FileUtil, SystemClock}
 import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
 import org.junit.{After, Before, Test}
 import org.mockito.Matchers._
 import org.mockito.{Matchers, Mockito}
@@ -47,7 +50,14 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable.HashMap
 import scala.collection.mutable
 
-class TestTaskStorageManager extends MockitoSugar {
+/**
+  * This test is parameterized on the offsetFileName and is run for both
+  * StorageManagerUtil.OFFSET_FILE_NAME_LEGACY and StorageManagerUtil.OFFSET_FILE_NAME_NEW.
+  *
+  * @param offsetFileName the name of the offset file.
+  */
+@RunWith(value = classOf[Parameterized])
+class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
 
   val store = "store1"
   val loggedStore = "loggedStore1"
@@ -84,7 +94,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val ssp = new SystemStreamPartition(ss, partition)
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
-    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFile = new File(storeDirectory, offsetFileName)
 
     val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
 
@@ -125,14 +135,14 @@ class TestTaskStorageManager extends MockitoSugar {
     // Test 2: flush should update the offset file
     taskManager.flush()
     assertTrue(offsetFile.exists())
-    assertEquals("{\"kafka.testStream-loggedStore1.0\":\"50\"}", FileUtil.readWithChecksum(offsetFile))
+    validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "50")
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
     when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("0", "100", "101"))
     taskManager.stop()
     assertTrue(storeFile.exists())
     assertTrue(offsetFile.exists())
-    assertEquals("{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
+    validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "100")
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
     sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
@@ -151,7 +161,7 @@ class TestTaskStorageManager extends MockitoSugar {
       .setStreamMetadataCache(mockStreamMetadataCache)
       .setSSPMetadataCache(mockSSPMetadataCache)
       .setSystemAdmin("kafka", mockSystemAdmin)
-        .initializeContainerStorageManager()
+      .initializeContainerStorageManager()
       .build
 
     taskManager.init
@@ -263,7 +273,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -281,7 +291,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // is older than deletionRetention of the changeLog.
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     storeDirectory.setLastModified(0)
-    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFile = new File(storeDirectory, offsetFileName)
     offsetFile.createNewFile()
     FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
@@ -298,7 +308,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -315,7 +325,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
 
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
-    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFile = new File(storeDirectory, offsetFileName)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -345,7 +355,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFile.exists())
-    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
+    validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "100")
   }
 
   /**
@@ -355,9 +365,9 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushCreatesOffsetFileForLoggedStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
     val anotherOffsetPath = new File(
-      StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+      StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + offsetFileName)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -381,7 +391,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+    validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100")
 
     assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
   }
@@ -393,7 +403,7 @@ class TestTaskStorageManager extends MockitoSugar {
   def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
 
     val sspMetadataCache = mock[SSPMetadataCache]
     val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
@@ -426,7 +436,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+    validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100")
 
     //Invoke test method again
     taskStorageManager.flush()
@@ -440,7 +450,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
     val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
     FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -470,7 +480,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
+    validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "139")
 
     // Flush again
     when(sspMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("20", "193", "194"))
@@ -480,14 +490,32 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
+    validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "193")
+  }
+
+  /**
+    * Validates the contents of the offsetFile against the given ssp and offset.
+    * The legacy offset file only contains the offset as a string, while the new offset file contains a map of
+    * ssp to offset in json format.
+    * The name of the two offset files are given in {@link StorageManagerUtil.OFFSET_FILE_NAME_NEW} and
+    * {@link StorageManagerUtil.OFFSET_FILE_LEGACY}.
+    */
+  private def validateOffsetFileContents(offsetFile: File, ssp: String, offset: String): Unit = {
+
+    if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_NEW)) {
+      assertEquals("Found incorrect value in offset file!", "{\"" + ssp + "\":\"" + offset + "\"}", FileUtil.readWithChecksum(offsetFile))
+    } else if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY)) {
+      assertEquals("Found incorrect value in offset file!", offset, FileUtil.readWithChecksum(offsetFile))
+    } else {
+      throw new IllegalArgumentException("Invalid offset file name");
+    }
   }
 
   @Test
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
-    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
 
 
     val sspMetadataCache = mock[SSPMetadataCache]
@@ -567,7 +595,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // Create a file in old single-offset format, with a sample offset
     val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
     val storeFile = new File(storeDirectory, "store.sst")
-    val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+    val offsetFile = new File(storeDirectory, offsetFileName)
     val sampleOldOffset = "912321"
     FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
 
@@ -578,6 +606,28 @@ class TestTaskStorageManager extends MockitoSugar {
     assertTrue(offsets.get(ssp).equals(sampleOldOffset))
   }
 
+  @Test
+  def testReadOfOffsetInCaseOfBothFilesPresent(): Unit = {
+    // Create a file in old single-offset format, with a sample offset, and another with the new-offset format
+    val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+    val storeFile = new File(storeDirectory, "store.sst")
+    val sampleOldOffset = "100000001"
+    val sampleNewOffset = "{\"kafka.test-stream.0\":\"200000002\"}"
+    FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY), sampleOldOffset)
+    FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW), sampleNewOffset)
+
+    // Ensure that the files exist
+    assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY).exists())
+    assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW).exists())
+
+    // read offset against a given ssp from the file, and check that the one in the new file should be read
+    var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
+    val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)
+
+    assertEquals(1, offsets.size())
+    assertEquals("200000002", offsets.get(ssp))
+  }
+
   private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
     val systemName = "kafka"
     val streamName = getStreamName(loggedStore)
@@ -590,7 +640,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val storeFile = new File(storeDirectory, "store.sst")
 
     if (writeOffsetFile) {
-      val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+      val offsetFile = new File(storeDirectory, offsetFileName)
       if (fileOffset != null) {
         FileUtil.writeWithChecksum(offsetFile, fileOffset)
       } else {
@@ -711,6 +761,17 @@ class TestTaskStorageManager extends MockitoSugar {
   }
 }
 
+object TestTaskStorageManager {
+
+  @Parameters def parameters: util.Collection[Array[String]] = {
+    val offsetFileNames = new util.ArrayList[Array[String]]()
+    offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_NEW))
+    offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY))
+    offsetFileNames
+  }
+}
+
+
 object TaskStorageManagerBuilder {
   val defaultStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "store")
   val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore")