You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/11/09 19:54:44 UTC

samza git commit: SAMZA-1480: TaskStorageManager improperly initializes changelog consu…

Repository: samza
Updated Branches:
  refs/heads/master 4269d6213 -> d35355aa0


SAMZA-1480: TaskStorageManager improperly initializes changelog consu…

…mer position when restoring a store from disk

Author: Jacob Maes <jm...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@linkedin.com>

Closes #350 from jmakes/samza-1480


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d35355aa
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d35355aa
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d35355aa

Branch: refs/heads/master
Commit: d35355aa018410ab5cf249d128b34912be085ca3
Parents: 4269d62
Author: Jacob Maes <jm...@linkedin.com>
Authored: Thu Nov 9 11:54:32 2017 -0800
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Thu Nov 9 11:54:32 2017 -0800

----------------------------------------------------------------------
 .../samza/storage/TaskStorageManager.scala      |  54 ++++-
 .../samza/storage/TestTaskStorageManager.scala  | 210 +++++++++++++++----
 .../scala/org/apache/samza/util/TestUtil.scala  |  16 ++
 3 files changed, 234 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/d35355aa/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index c8c935a..62dcdb0 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -68,7 +68,7 @@ class TaskStorageManager(
   }
 
   var changeLogOldestOffsets: Map[SystemStream, String] = Map()
-  val fileOffset: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
+  val fileOffsets: util.Map[SystemStreamPartition, String] = new util.HashMap[SystemStreamPartition, String]()
   val offsetFileName = "OFFSET"
 
   def apply(storageEngineName: String) = taskStores(storageEngineName)
@@ -104,7 +104,9 @@ class TaskStorageManager(
       } else {
         val offset = readOffsetFile(loggedStorePartitionDir)
         info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
-        fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+        if (offset != null) {
+          fileOffsets.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+        }
       }
     })
   }
@@ -210,7 +212,7 @@ class TaskStorageManager(
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val systemAdmin = systemAdmins
         .getOrElse(systemStream.getSystem,
-                   throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+                   throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
       val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
 
       systemAdmin.validateStream(changelogSpec)
@@ -228,12 +230,11 @@ class TaskStorageManager(
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
+      val admin = systemAdmins.getOrElse(systemStream.getSystem,
+        throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
       val consumer = storeConsumers(storeName)
-      val offset =
-        Option(fileOffset.get(systemStreamPartition))
-          .getOrElse(changeLogOldestOffsets
-            .getOrElse(systemStream, throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition)))
 
+      val offset = getStartingOffset(systemStreamPartition, admin)
       if (offset != null) {
         info("Registering change log consumer with offset %s for %s." format (offset, systemStreamPartition))
         consumer.register(systemStreamPartition, offset)
@@ -246,6 +247,43 @@ class TaskStorageManager(
     storeConsumers.values.foreach(_.start)
   }
 
+  /**
+    * Returns the offset with which the changelog consumer should be initialized for the given SystemStreamPartition.
+    *
+    * If a file offset exists, it represents the last changelog offset which is also reflected in the on-disk state.
+    * In that case, we use the next offset after the file offset, as long as it is newer than the oldest offset
+    * currently available in the stream.
+    *
+    * If there isn't a file offset or it's older than the oldest available offset, we simply start with the oldest.
+    *
+    * @param systemStreamPartition  the changelog partition for which the offset is needed.
+    * @param admin                  the [[SystemAdmin]] for the changelog.
+    * @return                       the offset to from which the changelog consumer should be initialized.
+    */
+  private def getStartingOffset(systemStreamPartition: SystemStreamPartition, admin: SystemAdmin) = {
+    val fileOffset = fileOffsets.get(systemStreamPartition)
+    val oldestOffset = changeLogOldestOffsets
+      .getOrElse(systemStreamPartition.getSystemStream,
+        throw new SamzaException("Missing a change log offset for %s." format systemStreamPartition))
+
+    if (fileOffset != null) {
+      // File offset was the last message written to the changelog that is also reflected in the store,
+      // so we start with the NEXT offset
+      val resumeOffset = admin.getOffsetsAfter(Map(systemStreamPartition -> fileOffset).asJava).get(systemStreamPartition)
+      if (admin.offsetComparator(oldestOffset, resumeOffset) <= 0) {
+        resumeOffset
+      } else {
+        // If the offset we plan to use is older than the oldest offset, just use the oldest offset.
+        // This can happen with changelogs configured with a TTL cleanup policy
+        warn(s"Local store offset $resumeOffset is lower than the oldest offset $oldestOffset of the changelog. " +
+          s"The values between these offsets cannot be restored.")
+        oldestOffset
+      }
+    } else {
+      oldestOffset
+    }
+  }
+
   private def restoreStores() {
     debug("Restoring stores.")
 
@@ -298,7 +336,7 @@ class TaskStorageManager(
     for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) {
       val systemAdmin = systemAdmins
               .getOrElse(systemStream.getSystem,
-                         throw new SamzaException("Unable to get systemAdmin for store " + storeName + " and systemStream" + systemStream))
+                         throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
 
       debug("Fetching newest offset for store %s" format(storeName))
       try {

http://git-wip-us.apache.org/repos/asf/samza/blob/d35355aa/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
index ea4d37b..29f6eb7 100644
--- a/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
@@ -20,18 +20,16 @@
 package org.apache.samza.storage
 
 
-import java.io.File
+import java.io.{File, FileOutputStream, ObjectOutputStream}
 import java.util
 
 import org.apache.samza.Partition
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.StorageConfig
+import org.apache.samza.config.{MapConfig, StorageConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
-import org.apache.samza.util.SystemClock
-import org.apache.samza.util.Util
+import org.apache.samza.util.{SystemClock, Util}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Matchers._
@@ -77,19 +75,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val storeFile = new File(storeDirectory, "store.sst")
     val offsetFile = new File(storeDirectory, "OFFSET")
 
-    // getStoreProperties should always return the same StoreProperties
-    val mockStorageEngine = mock[StorageEngine]
-    when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
-      override def answer(invocation: InvocationOnMock): StoreProperties = {
-        new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build()
-      }
-    })
-    // Restore simply creates the file
-    when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
-      override def answer(invocation: InvocationOnMock): Unit = {
-        storeFile.createNewFile()
-      }
-    })
+    val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
 
     // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
     val mockStreamMetadataCache = mock[StreamMetadataCache]
@@ -192,15 +178,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val ssp = new SystemStreamPartition(ss, partition)
     val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName)
 
-    // getStoreProperties should always return the same StoreProperties
-    val mockStorageEngine = mock[StorageEngine]
-    when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
-      override def answer(invocation: InvocationOnMock): StoreProperties = {
-        new StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(false).build()
-      }
-    })
-    // Restore simply creates the file
-    doNothing().when(mockStorageEngine).restore(any())
+    val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null)
 
     // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
     val mockStreamMetadataCache = mock[StreamMetadataCache]
@@ -308,7 +286,7 @@ class TestTaskStorageManager extends MockitoSugar {
     cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
 
     assertTrue("Offset file was removed. Clean up failed!", offsetFilePath.exists())
-    assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
+    assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffsets.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
   }
 
   @Test
@@ -510,6 +488,167 @@ class TestTaskStorageManager extends MockitoSugar {
     //Check conditions
     assertTrue("Offset file should not exist!", !offsetFilePath.exists())
   }
+
+  @Test
+  def testCleanBaseDirsShouldNotAddNullOffsetsToFileOffsetsMap(): Unit = {
+    // If a null file offset were allowed, and the full Map passed to SystemAdmin.getOffsetsAfter an NPE could
+    // occur for some SystemAdmin implementations
+    val writeOffsetFile = true
+    val fileOffset = null
+    val oldestOffset = "3"
+    val newestOffset = "150"
+    val upcomingOffset = "151"
+    val expectedRegisteredOffset = "3"
+
+    testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+  }
+
+  @Test
+  def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetValid(): Unit = {
+    // We should register the offset AFTER the stored file offset.
+    // The file offset represents the last changelog message that is also reflected in the store. So start with next one.
+    val writeOffsetFile = true
+    val fileOffset = "139"
+    val oldestOffset = "3"
+    val newestOffset = "150"
+    val upcomingOffset = "151"
+    val expectedRegisteredOffset = "140"
+
+    testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+  }
+
+  @Test
+  def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetOlderThanOldestOffset(): Unit = {
+    // We should register the oldest offset if it is less than the file offset
+    val writeOffsetFile = true
+    val fileOffset = "139"
+    val oldestOffset = "145"
+    val newestOffset = "150"
+    val upcomingOffset = "151"
+    val expectedRegisteredOffset = "145"
+
+    testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+  }
+
+  @Test
+  def testStartConsumersShouldRegisterCorrectOffsetWhenOldestOffsetGreaterThanZero(): Unit = {
+    val writeOffsetFile = false
+    val fileOffset = null
+    val oldestOffset = "3"
+    val newestOffset = "150"
+    val upcomingOffset = "151"
+    val expectedRegisteredOffset = "3"
+
+    testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile)
+  }
+
+  private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = {
+    val systemName = "kafka"
+    val streamName = "testStream"
+    val partitionCount = 1
+    // Basic test setup of SystemStream, SystemStreamPartition for this task
+    val ss = new SystemStream(systemName, streamName)
+    val partition = new Partition(0)
+    val ssp = new SystemStreamPartition(ss, partition)
+    val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+    val storeFile = new File(storeDirectory, "store.sst")
+
+    if (writeOffsetFile) {
+      val offsetFile = new File(storeDirectory, "OFFSET")
+      if (fileOffset != null) {
+        Util.writeDataToFile(offsetFile, fileOffset)
+      } else {
+        // Write garbage to produce a null result when it's read
+        val fos = new FileOutputStream(offsetFile)
+        val oos = new ObjectOutputStream(fos)
+        oos.writeLong(1)
+        oos.writeUTF("Bad Offset")
+        oos.close()
+        fos.close()
+      }
+    }
+
+    val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile)
+
+    // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin
+    val mockStreamMetadataCache = mock[StreamMetadataCache]
+
+    val mockSystemAdmin = mock[SystemAdmin]
+    val changelogSpec = StreamSpec.createChangeLogStreamSpec(streamName, systemName, partitionCount)
+    doNothing().when(mockSystemAdmin).validateStream(changelogSpec)
+    when(mockSystemAdmin.getOffsetsAfter(any())).thenAnswer(new Answer[util.Map[SystemStreamPartition, String]] {
+      override def answer(invocation: InvocationOnMock): util.Map[SystemStreamPartition, String] = {
+        val originalOffsets = invocation.getArgumentAt(0, classOf[util.Map[SystemStreamPartition, String]])
+        originalOffsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
+      }
+    })
+    when(mockSystemAdmin.offsetComparator(any(), any())).thenAnswer(new Answer[Integer] {
+      override def answer(invocation: InvocationOnMock): Integer = {
+        val offset1 = invocation.getArgumentAt(0, classOf[String])
+        val offset2 = invocation.getArgumentAt(1, classOf[String])
+        offset1.toLong compare offset2.toLong
+      }
+    })
+
+    val mockSystemConsumer = mock[SystemConsumer]
+    when(mockSystemConsumer.register(any(), any())).thenAnswer(new Answer[Unit] {
+      override def answer(invocation: InvocationOnMock): Unit = {
+        val args = invocation.getArguments
+        if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) {
+          val offset = args.apply(1).asInstanceOf[String]
+          assertNotNull(offset)
+          assertEquals(expectedRegisteredOffset, offset)
+        }
+      }
+    })
+    doNothing().when(mockSystemConsumer).stop()
+
+    // Test 1: Initial invocation - No store on disk (only changelog has data)
+    // Setup initial sspMetadata
+    val sspMetadata = new SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset)
+    var metadata = new SystemStreamMetadata(streamName, new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
+      {
+        put(partition, sspMetadata)
+      }
+    })
+    when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata))
+    when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata]() {
+      {
+        put(streamName, metadata)
+      }
+    })
+
+    val taskManager = new TaskStorageManagerBuilder()
+      .addStore(loggedStore, mockStorageEngine, mockSystemConsumer)
+      .setStreamMetadataCache(mockStreamMetadataCache)
+      .setSystemAdmin(systemName, mockSystemAdmin)
+      .build
+
+    taskManager.init
+
+    verify(mockSystemConsumer).register(any(classOf[SystemStreamPartition]), anyString())
+  }
+
+  private def createMockStorageEngine(isLoggedStore: Boolean, isPersistedStore: Boolean, storeFile: File) = {
+    val mockStorageEngine = mock[StorageEngine]
+    // getStoreProperties should always return the same StoreProperties
+    when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] {
+      override def answer(invocation: InvocationOnMock): StoreProperties = {
+        new StorePropertiesBuilder().setLoggedStore(isLoggedStore).setPersistedToDisk(isPersistedStore).build()
+      }
+    })
+    // Restore simply creates the file
+    if (storeFile != null) {
+      when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] {
+        override def answer(invocation: InvocationOnMock): Unit = {
+          storeFile.createNewFile()
+        }
+      })
+    } else {
+      doNothing().when(mockStorageEngine).restore(any())
+    }
+    mockStorageEngine
+  }
 }
 
 object TaskStorageManagerBuilder {
@@ -536,16 +675,11 @@ class TaskStorageManagerBuilder extends MockitoSugar {
     this
   }
 
-  def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder =  {
-    taskStores = taskStores ++ {
-      val mockStorageEngine = mock[StorageEngine]
-      when(mockStorageEngine.getStoreProperties)
-        .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
-      Map(storeName -> mockStorageEngine)
-    }
-    storeConsumers = storeConsumers ++ Map(storeName -> mock[SystemConsumer])
-    changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", "testStream"))
-    this
+  def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = {
+    val mockStorageEngine = mock[StorageEngine]
+    when(mockStorageEngine.getStoreProperties)
+      .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build())
+    addStore(storeName, mockStorageEngine, mock[SystemConsumer])
   }
 
   def setPartition(p: Partition) = {

http://git-wip-us.apache.org/repos/asf/samza/blob/d35355aa/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index 1f7dc01..fae735b 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -64,7 +64,23 @@ class TestUtil {
 
     // Check data returned
     assertEquals(data, result)
+  }
 
+  @Test
+  def testReadInvalidDataFromFile() {
+    // Write garbage to produce a null result when it's read
+    val fos = new FileOutputStream(file)
+    val oos = new ObjectOutputStream(fos)
+    oos.writeLong(1)
+    oos.writeUTF("Junk Data")
+    oos.close()
+    fos.close()
+
+    // Invoke test
+    val result = Util.readDataFromFile(file)
+
+    // Check data returned
+    assertNull(result)
   }
 
   @Test