You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2019/05/02 22:24:27 UTC
[samza] branch master updated: Adding tests for legacy store offset
file (#1019)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new fca8437 Adding tests for legacy store offset file (#1019)
fca8437 is described below
commit fca84373f9d45d3409e1996f32d1070c08129813
Author: rmatharu <40...@users.noreply.github.com>
AuthorDate: Thu May 2 15:24:22 2019 -0700
Adding tests for legacy store offset file (#1019)
* Adding parameterized tests for legacy-offset file
* Adding test for checking precedence between offset files
---
.../samza/storage/TestTaskStorageManager.scala | 103 ++++++++++++++++-----
1 file changed, 82 insertions(+), 21 deletions(-)
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index 3a4b2b5..8188851 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -35,6 +35,9 @@ import org.apache.samza.system._
import org.apache.samza.task.TaskInstanceCollector
import org.apache.samza.util.{FileUtil, SystemClock}
import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
import org.junit.{After, Before, Test}
import org.mockito.Matchers._
import org.mockito.{Matchers, Mockito}
@@ -47,7 +50,14 @@ import scala.collection.JavaConverters._
import scala.collection.immutable.HashMap
import scala.collection.mutable
-class TestTaskStorageManager extends MockitoSugar {
+/**
+ * This test is parameterized on the offsetFileName and is run for both
+ * StorageManagerUtil.OFFSET_FILE_NAME_LEGACY and StorageManagerUtil.OFFSET_FILE_NAME_NEW.
+ *
+ * @param offsetFileName the name of the offset file.
+ */
+@RunWith(value = classOf[Parameterized])
+class TestTaskStorageManager(offsetFileName: String) extends MockitoSugar {
val store = "store1"
val loggedStore = "loggedStore1"
@@ -84,7 +94,7 @@ class TestTaskStorageManager extends MockitoSugar {
val ssp = new SystemStreamPartition(ss, partition)
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
- val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFile = new File(storeDirectory, offsetFileName)
val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
@@ -125,14 +135,14 @@ class TestTaskStorageManager extends MockitoSugar {
// Test 2: flush should update the offset file
taskManager.flush()
assertTrue(offsetFile.exists())
- assertEquals("{\"kafka.testStream-loggedStore1.0\":\"50\"}", FileUtil.readWithChecksum(offsetFile))
+ validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "50")
// Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
when(mockSSPMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("0", "100", "101"))
taskManager.stop()
assertTrue(storeFile.exists())
assertTrue(offsetFile.exists())
- assertEquals("{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
+ validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "100")
// Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151")
@@ -151,7 +161,7 @@ class TestTaskStorageManager extends MockitoSugar {
.setStreamMetadataCache(mockStreamMetadataCache)
.setSSPMetadataCache(mockSSPMetadataCache)
.setSystemAdmin("kafka", mockSystemAdmin)
- .initializeContainerStorageManager()
+ .initializeContainerStorageManager()
.build
taskManager.init
@@ -263,7 +273,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -281,7 +291,7 @@ class TestTaskStorageManager extends MockitoSugar {
// is older than deletionRetention of the changeLog.
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
storeDirectory.setLastModified(0)
- val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFile = new File(storeDirectory, offsetFileName)
offsetFile.createNewFile()
FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
offsetFile.setLastModified(0)
@@ -298,7 +308,7 @@ class TestTaskStorageManager extends MockitoSugar {
@Test
def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val taskStorageManager = new TaskStorageManagerBuilder()
@@ -315,7 +325,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
- val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFile = new File(storeDirectory, offsetFileName)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -345,7 +355,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFile.exists())
- assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFile))
+ validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "100")
}
/**
@@ -355,9 +365,9 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushCreatesOffsetFileForLoggedStore() {
val partition = new Partition(0)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
val anotherOffsetPath = new File(
- StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + offsetFileName)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101")
@@ -381,7 +391,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+ validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100")
assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
}
@@ -393,7 +403,7 @@ class TestTaskStorageManager extends MockitoSugar {
def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() {
val partition = new Partition(0)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
val sspMetadataCache = mock[SSPMetadataCache]
val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101")
@@ -426,7 +436,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"100\"}", FileUtil.readWithChecksum(offsetFilePath))
+ validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100")
//Invoke test method again
taskStorageManager.flush()
@@ -440,7 +450,7 @@ class TestTaskStorageManager extends MockitoSugar {
val partition = new Partition(0)
val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
FileUtil.writeWithChecksum(offsetFilePath, "100")
val sspMetadataCache = mock[SSPMetadataCache]
@@ -470,7 +480,7 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"139\"}", FileUtil.readWithChecksum(offsetFilePath))
+ validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "139")
// Flush again
when(sspMetadataCache.getMetadata(ssp)).thenReturn(new SystemStreamPartitionMetadata("20", "193", "194"))
@@ -480,14 +490,32 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
- assertEquals("Found incorrect value in offset file!", "{\"kafka.testStream-loggedStore1.0\":\"193\"}", FileUtil.readWithChecksum(offsetFilePath))
+ validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "193")
+ }
+
+ /**
+ * Validates the contents of the offsetFile against the given ssp and offset.
+ * The legacy offset file only contains the offset as a string, while the new offset file contains a map of
+ * ssp to offset in json format.
+ * The name of the two offset files are given in {@link StorageManagerUtil.OFFSET_FILE_NAME_NEW} and
+ * {@link StorageManagerUtil.OFFSET_FILE_LEGACY}.
+ */
+ private def validateOffsetFileContents(offsetFile: File, ssp: String, offset: String): Unit = {
+
+ if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_NEW)) {
+ assertEquals("Found incorrect value in offset file!", "{\"" + ssp + "\":\"" + offset + "\"}", FileUtil.readWithChecksum(offsetFile))
+ } else if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY)) {
+ assertEquals("Found incorrect value in offset file!", offset, FileUtil.readWithChecksum(offsetFile))
+ } else {
+ throw new IllegalArgumentException("Invalid offset file name");
+ }
}
@Test
def testStopShouldNotCreateOffsetFileForEmptyStore() {
val partition = new Partition(0)
- val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFilePath = new File(StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName)
val sspMetadataCache = mock[SSPMetadataCache]
@@ -567,7 +595,7 @@ class TestTaskStorageManager extends MockitoSugar {
// Create a file in old single-offset format, with a sample offset
val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
val storeFile = new File(storeDirectory, "store.sst")
- val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFile = new File(storeDirectory, offsetFileName)
val sampleOldOffset = "912321"
FileUtil.writeWithChecksum(offsetFile, sampleOldOffset)
@@ -578,6 +606,28 @@ class TestTaskStorageManager extends MockitoSugar {
assertTrue(offsets.get(ssp).equals(sampleOldOffset))
}
+ @Test
+ def testReadOfOffsetInCaseOfBothFilesPresent(): Unit = {
+ // Create a file in old single-offset format, with a sample offset, and another with the new-offset format
+ val storeDirectory = StorageManagerUtil.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active)
+ val storeFile = new File(storeDirectory, "store.sst")
+ val sampleOldOffset = "100000001"
+ val sampleNewOffset = "{\"kafka.test-stream.0\":\"200000002\"}"
+ FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY), sampleOldOffset)
+ FileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW), sampleNewOffset)
+
+ // Ensure that the files exist
+ assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY).exists())
+ assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW).exists())
+
+ // read offset against a given ssp from the file, and check that the one in the new file should be read
+ var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0))
+ val offsets = StorageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false)
+
+ assertEquals(1, offsets.size())
+ assertEquals("200000002", offsets.get(ssp))
+ }
+
private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
val systemName = "kafka"
val streamName = getStreamName(loggedStore)
@@ -590,7 +640,7 @@ class TestTaskStorageManager extends MockitoSugar {
val storeFile = new File(storeDirectory, "store.sst")
if (writeOffsetFile) {
- val offsetFile = new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW)
+ val offsetFile = new File(storeDirectory, offsetFileName)
if (fileOffset != null) {
FileUtil.writeWithChecksum(offsetFile, fileOffset)
} else {
@@ -711,6 +761,17 @@ class TestTaskStorageManager extends MockitoSugar {
}
}
+object TestTaskStorageManager {
+
+ @Parameters def parameters: util.Collection[Array[String]] = {
+ val offsetFileNames = new util.ArrayList[Array[String]]()
+ offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_NEW))
+ offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY))
+ offsetFileNames
+ }
+}
+
+
object TaskStorageManagerBuilder {
val defaultStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "store")
val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore")