You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/11/09 19:54:44 UTC
samza git commit: SAMZA-1480: TaskStorageManager improperly initializes changelog consu…
Repository: samza
Updated Branches:
refs/heads/master 4269d6213 -> d35355aa0
SAMZA-1480: TaskStorageManager improperly initializes changelog consu…
…mer position when restoring a store from disk
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #350 from jmakes/samza-1480
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d35355aa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d35355aa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d35355aa
Branch: refs/heads/master
Commit: d35355aa018410ab5cf249d128b34912be085ca3
Parents: 4269d62
Author: Jacob Maes <jm...@linkedin.com>
Authored: Thu Nov 9 11:54:32 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu Nov 9 11:54:32 2017 -0800
----------------------------------------------------------------------
.../samza/storage/TaskStorageManager.scala | 54 ++++-
.../samza/storage/TestTaskStorageManager.scala | 210 +++++++++++++++----
.../scala/org/apache/samza/util/TestUtil.scala | 16 ++
3 files changed, 234 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d35355aa/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c8c935a..62dcdb0 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -68,7 +68,7 @@ class TaskStorageManager(
}
var changeLogOldestOffsets: Map[SystemStream, String] = Map()
- val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
+ val fileOffsets: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
val offsetFileName = "OFFSET"
def apply(storageEngineName: String) = taskStores(storageEngineName)
@@ -104,7 +104,9 @@ class TaskStorageManager(
} else {
val offset = readOffsetFile(loggedStorePartitionDir)
info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
- fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+ if (offset != null) {
+ fileOffsets.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+ }
}
})
}
@@ -210,7 +212,7 @@ class TaskStorageManager(
for ((storeName, systemStream) <- changeLogSystemStreams) {
val systemAdmin = systemAdmins
.getOrElse(systemStream.getSystem,
- throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+ throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
systemAdmin.validateStream(changelogSpec)
@@ -228,12 +230,11 @@ class TaskStorageManager(
for ((storeName, systemStream) <- changeLogSystemStreams) {
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+ val admin = systemAdmins.getOrElse(systemStream.getSystem,
+ throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
val consumer = storeConsumers(storeName)
- val offset =
- Option(fileOffset.get(systemStreamPartition))
- .getOrElse(changeLogOldestOffsets
- .getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)))
+ val offset = getStartingOffset(systemStreamPartition, admin)
if (offset != null) {
info("Registering change log consumer with offset %s for %s." format (offset, systemStreamPartition))
consumer.register(systemStreamPartition, offset)
@@ -246,6 +247,43 @@ class TaskStorageManager(
storeConsumers.values.foreach(_.start)
}
+ /**
+ * Returns the offset with which the changelog consumer should be initialized for the given SystemStreamPartition.
+ *
+ * If a file offset exists, it represents the last changelog offset which is also reflected in the on-disk state.
+ * In that case, we use the next offset after the file offset, as long as it is newer than the oldest offset
+ * currently available in the stream.
+ *
+ * If there isn't a file offset or it's older than the oldest available offset, we simply start with the oldest.
+ *
+ * @param systemStreamPartition the changelog partition for which the offset is needed.
+ * @param admin the [[SystemAdmin]] for the changelog.
+ * @return the offset to from which the changelog consumer should be initialized.
+ */
+ private def getStartingOffset(systemStreamPartition: SystemStreamPartition, admin: SystemAdmin) = {
+ val fileOffset = fileOffsets.get(systemStreamPartition)
+ val oldestOffset = changeLogOldestOffsets
+ .getOrElse(systemStreamPartition.getSystemStream,
+ throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
+
+ if (fileOffset != null) {
+ // File offset was the last message written to the changelog that is also reflected in the store,
+ // so we start with the NEXT offset
+ val resumeOffset = admin.getOffsetsAfter(Map(systemStreamPartition -> fileOffset).asJava).get(systemStreamPartition)
+ if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
+ resumeOffset
+ } else {
+ // If the offset we plan to use is older than the oldest offset, just use the oldest offset.
+ // This can happen with changelogs configured with a TTL cleanup policy
+ warn(s"Local store offset $resumeOffset is lower than the oldest offset $oldestOffset of the changelog. " +
+ s"The values between these offsets cannot be restored.")
+ oldestOffset
+ }
+ } else {
+ oldestOffset
+ }
+ }
+
private def restoreStores() {
debug("Restoring stores.")
@@ -298,7 +336,7 @@ class TaskStorageManager(
for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) {
val systemAdmin = systemAdmins
.getOrElse(systemStream.getSystem,
- throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+ throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
debug("Fetching newest offset for store %s" format(storeName))
try {
http://git-wip-us.apache.org/repos/asf/samza/blob/d35355aa/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index ea4d37b..29f6eb7 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -20,18 +20,16 @@
package org.apache.samza.storage
-import java.io.File
+import java.io.{File, FileOutputStream, ObjectOutputStream}
import java.util
import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.StorageConfig
+import org.apache.samza.config.{MapConfig, StorageConfig}
import org.apache.samza.container.TaskName
import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
-import org.apache.samza.util.SystemClock
-import org.apache.samza.util.Util
+import org.apache.samza.util.{SystemClock, Util}
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.mockito.Matchers._
@@ -77,19 +75,7 @@ class TestTaskStorageManager extends MockitoSugar {
val storeFile = new File(storeDirectory, "store.sst")
val offsetFile = new File(storeDirectory, "OFFSET")
- // getStoreProperties should always return the same StoreProperties
- val mockStorageEngine = mock[StorageEngine]
- when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
- override def answer(invocation: InvocationOnMock): StoreProperties = {
- new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build()
- }
- })
- // Restore simply creates the file
- when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
- override def answer(invocation: InvocationOnMock): Unit = {
- storeFile.createNewFile()
- }
- })
+ val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
val mockStreamMetadataCache = mock[StreamMetadataCache]
@@ -192,15 +178,7 @@ class TestTaskStorageManager extends MockitoSugar {
val ssp = new SystemStreamPartition(ss, partition)
val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName)
- // getStoreProperties should always return the same StoreProperties
- val mockStorageEngine = mock[StorageEngine]
- when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
- override def answer(invocation: InvocationOnMock): StoreProperties = {
- new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(false).build()
- }
- })
- // Restore simply creates the file
- doNothing().when(mockStorageEngine).restore(any())
+ val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
// Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
val mockStreamMetadataCache = mock[StreamMetadataCache]
@@ -308,7 +286,7 @@ class TestTaskStorageManager extends MockitoSugar {
cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
assertTrue("Offset file was removed. Clean up failed!", offsetFilePath.exists())
- assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
+ assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffsets.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
}
@Test
@@ -510,6 +488,167 @@ class TestTaskStorageManager extends MockitoSugar {
//Check conditions
assertTrue("Offset file should not exist!", !offsetFilePath.exists())
}
+
+ @Test
+ def testCleanBaseDirsShouldNotAddNullOffsetsToFileOffsetsMap(): Unit = {
+ // If a null file offset were allowed, and the full Map passed to SystemAdmin.getOffsetsAfter an NPE could
+ // occur for some SystemAdmin implementations
+ val writeOffsetFile = true
+ val fileOffset = null
+ val oldestOffset = "3"
+ val newestOffset = "150"
+ val upcomingOffset = "151"
+ val expectedRegisteredOffset = "3"
+
+ testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+ }
+
+ @Test
+ def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetValid(): Unit = {
+ // We should register the offset AFTER the stored file offset.
+ // The file offset represents the last changelog message that is also reflected in the store. So start with next one.
+ val writeOffsetFile = true
+ val fileOffset = "139"
+ val oldestOffset = "3"
+ val newestOffset = "150"
+ val upcomingOffset = "151"
+ val expectedRegisteredOffset = "140"
+
+ testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+ }
+
+ @Test
+ def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetOlderThanOldestOffset(): Unit = {
+ // We should register the oldest offset if it is less than the file offset
+ val writeOffsetFile = true
+ val fileOffset = "139"
+ val oldestOffset = "145"
+ val newestOffset = "150"
+ val upcomingOffset = "151"
+ val expectedRegisteredOffset = "145"
+
+ testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+ }
+
+ @Test
+ def testStartConsumersShouldRegisterCorrectOffsetWhenOldestOffsetGreaterThanZero(): Unit = {
+ val writeOffsetFile = false
+ val fileOffset = null
+ val oldestOffset = "3"
+ val newestOffset = "150"
+ val upcomingOffset = "151"
+ val expectedRegisteredOffset = "3"
+
+ testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+ }
+
+ private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
+ val systemName = "kafka"
+ val streamName = "testStream"
+ val partitionCount = 1
+ // Basic test setup of SystemStream, SystemStreamPartition for this task
+ val ss = new SystemStream(systemName, streamName)
+ val partition = new Partition(0)
+ val ssp = new SystemStreamPartition(ss, partition)
+ val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+ val storeFile = new File(storeDirectory, "store.sst")
+
+ if (writeOffsetFile) {
+ val offsetFile = new File(storeDirectory, "OFFSET")
+ if (fileOffset != null) {
+ Util.writeDataToFile(offsetFile, fileOffset)
+ } else {
+ // Write garbage to produce a null result when it's read
+ val fos = new FileOutputStream(offsetFile)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeLong(1)
+ oos.writeUTF("Bad Offset")
+ oos.close()
+ fos.close()
+ }
+ }
+
+ val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
+
+ // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
+ val mockStreamMetadataCache = mock[StreamMetadataCache]
+
+ val mockSystemAdmin = mock[SystemAdmin]
+ val changelogSpec = StreamSpec.createChangeLogStreamSpec(streamName, systemName, partitionCount)
+ doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
+ when(mockSystemAdmin.getOffsetsAfter(any())).thenAnswer(new Answer[util.Map[SystemStreamPartition, String]] {
+ override def answer(invocation: InvocationOnMock): util.Map[SystemStreamPartition, String] = {
+ val originalOffsets = invocation.getArgumentAt(0, classOf[util.Map[SystemStreamPartition, String]])
+ originalOffsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
+ }
+ })
+ when(mockSystemAdmin.offsetComparator(any(), any())).thenAnswer(new Answer[Integer] {
+ override def answer(invocation: InvocationOnMock): Integer = {
+ val offset1 = invocation.getArgumentAt(0, classOf[String])
+ val offset2 = invocation.getArgumentAt(1, classOf[String])
+ offset1.toLong compare offset2.toLong
+ }
+ })
+
+ val mockSystemConsumer = mock[SystemConsumer]
+ when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
+ override def answer(invocation: InvocationOnMock): Unit = {
+ val args = invocation.getArguments
+ if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) {
+ val offset = args.apply(1).asInstanceOf[String]
+ assertNotNull(offset)
+ assertEquals(expectedRegisteredOffset, offset)
+ }
+ }
+ })
+ doNothing().when(mockSystemConsumer).stop()
+
+ // Test 1: Initial invocation - No store on disk (only changelog has data)
+ // Setup initial sspMetadata
+ val sspMetadata = new SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset)
+ var metadata = new SystemStreamMetadata(streamName, new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+ {
+ put(partition, sspMetadata)
+ }
+ })
+ when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata))
+ when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata]() {
+ {
+ put(streamName, metadata)
+ }
+ })
+
+ val taskManager = new TaskStorageManagerBuilder()
+ .addStore(loggedStore, mockStorageEngine, mockSystemConsumer)
+ .setStreamMetadataCache(mockStreamMetadataCache)
+ .setSystemAdmin(systemName, mockSystemAdmin)
+ .build
+
+ taskManager.init
+
+ verify(mockSystemConsumer).register(any(classOf[SystemStreamPartition]), anyString())
+ }
+
+ private def createMockStorageEngine(isLoggedStore: Boolean, isPersistedStore: Boolean, storeFile: File) = {
+ val mockStorageEngine = mock[StorageEngine]
+ // getStoreProperties should always return the same StoreProperties
+ when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
+ override def answer(invocation: InvocationOnMock): StoreProperties = {
+ new StorePropertiesBuilder().setLoggedStore(isLoggedStore).setPersistedToDisk(isPersistedStore).build()
+ }
+ })
+ // Restore simply creates the file
+ if (storeFile != null) {
+ when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
+ override def answer(invocation: InvocationOnMock): Unit = {
+ storeFile.createNewFile()
+ }
+ })
+ } else {
+ doNothing().when(mockStorageEngine).restore(any())
+ }
+ mockStorageEngine
+ }
}
object TaskStorageManagerBuilder {
@@ -536,16 +675,11 @@ class TaskStorageManagerBuilder extends MockitoSugar {
this
}
- def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
- taskStores = taskStores ++ {
- val mockStorageEngine = mock[StorageEngine]
- when(mockStorageEngine.getStoreProperties)
- .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
- Map(storeName -> mockStorageEngine)
- }
- storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer])
- changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream"))
- this
+ def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
+ val mockStorageEngine = mock[StorageEngine]
+ when(mockStorageEngine.getStoreProperties)
+ .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
+ addStore(storeName, mockStorageEngine, mock[SystemConsumer])
}
def setPartition(p: Partition) = {
http://git-wip-us.apache.org/repos/asf/samza/blob/d35355aa/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 1f7dc01..fae735b 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -64,7 +64,23 @@ class TestUtil {
// Check data returned
assertEquals(data, result)
+ }
+ @Test
+ def testReadInvalidDataFromFile() {
+ // Write garbage to produce a null result when it's read
+ val fos = new FileOutputStream(file)
+ val oos = new ObjectOutputStream(fos)
+ oos.writeLong(1)
+ oos.writeUTF("Junk Data")
+ oos.close()
+ fos.close()
+
+ // Invoke test
+ val result = Util.readDataFromFile(file)
+
+ // Check data returned
+ assertNull(result)
}
@Test