You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/07/31 00:12:18 UTC

[samza] branch master updated: Remove the duplicate fetch system stream metadata API invocations from SamzaContainer startup sequence. (#1119)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ef077cc  Remove the duplicate fetch system stream metadata API invocations from SamzaContainer startup sequence. (#1119)
ef077cc is described below

commit ef077ccc5d06eb59063692f1d30423b67be720a1
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Tue Jul 30 17:12:13 2019 -0700

    Remove the duplicate fetch system stream metadata API invocations from SamzaContainer startup sequence. (#1119)
---
 .../apache/samza/container/SamzaContainer.scala    |  1 +
 .../org/apache/samza/container/TaskInstance.scala  | 34 ++++++++++------------
 .../org/apache/samza/container/TestRunLoop.java    |  1 +
 .../apache/samza/container/TestTaskInstance.scala  |  2 ++
 4 files changed, 20 insertions(+), 18 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index db8dbae..79b16b4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -579,6 +579,7 @@ object SamzaContainer extends Logging {
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, taskConfig),
           jobModel = jobModel,
           streamMetadataCache = streamMetadataCache,
+          inputStreamMetadata = inputStreamMetadata,
           timerExecutor = timerExecutor,
           jobContext = jobContext,
           containerContext = containerContext,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 22879fa..a17a790 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -22,8 +22,6 @@ package org.apache.samza.container
 
 import java.util.{Objects, Optional}
 import java.util.concurrent.ScheduledExecutorService
-
-import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.checkpoint.OffsetManager
 import org.apache.samza.config.Config
@@ -31,7 +29,6 @@ import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.context._
 import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback}
-import org.apache.samza.startpoint.Startpoint
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system._
@@ -57,6 +54,7 @@ class TaskInstance(
   val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
   jobModel: JobModel = null,
   streamMetadataCache: StreamMetadataCache = null,
+  inputStreamMetadata: Map[SystemStream, SystemStreamMetadata] = Map(),
   timerExecutor : ScheduledExecutorService = null,
   jobContext: JobContext,
   containerContext: ContainerContext,
@@ -326,22 +324,22 @@ class TaskInstance(
     * Check each partition assigned to the task is caught to the last offset
     */
   def initCaughtUpMapping() {
-    if (taskContext.getStreamMetadataCache != null) {
+    if (inputStreamMetadata != null && inputStreamMetadata.nonEmpty) {
       systemStreamPartitions.foreach(ssp => {
-        val partitionMetadata = taskContext
-          .getStreamMetadataCache
-          .getSystemStreamMetadata(ssp.getSystemStream, false)
-          .getSystemStreamPartitionMetadata.get(ssp.getPartition)
-
-        val upcomingOffset = partitionMetadata.getUpcomingOffset
-        val startingOffset = offsetManager.getStartingOffset(taskName, ssp)
-          .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format ssp))
-
-        // Mark ssp to be caught up if the starting offset is already the
-        // upcoming offset, meaning the task has consumed all the messages
-        // in this partition before and waiting for the future incoming messages.
-        if(Objects.equals(upcomingOffset, startingOffset)) {
-          ssp2CaughtupMapping(ssp) = true
+        if (inputStreamMetadata.contains(ssp.getSystemStream)) {
+          val partitionMetadata = inputStreamMetadata(ssp.getSystemStream)
+            .getSystemStreamPartitionMetadata.get(ssp.getPartition)
+
+          val upcomingOffset = partitionMetadata.getUpcomingOffset
+          val startingOffset = offsetManager.getStartingOffset(taskName, ssp)
+            .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format ssp))
+
+          // Mark ssp to be caught up if the starting offset is already the
+          // upcoming offset, meaning the task has consumed all the messages
+          // in this partition before and waiting for the future incoming messages.
+          if(Objects.equals(upcomingOffset, startingOffset)) {
+            ssp2CaughtupMapping(ssp) = true
+          }
         }
       })
     }
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 41e55ed..4556679 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -108,6 +108,7 @@ public class TestRunLoop {
         null,
         null,
         null,
+        null,
         mock(JobContext.class),
         mock(ContainerContext.class),
         Option.apply(null),
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 921e91b..0005ccd 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -271,6 +271,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
       .thenReturn(Collections.singletonMap(new Partition(0), sspMetadata))
 
     val ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val inputStreamMetadata = collection.Map(ssp.getSystemStream -> systemStreamMetadata)
 
     val taskInstance = new TaskInstance(this.task,
       this.taskModel,
@@ -284,6 +285,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
       systemStreamPartitions = Set(ssp),
       exceptionHandler = this.taskInstanceExceptionHandler,
       streamMetadataCache = cacheMock,
+      inputStreamMetadata = Map.empty ++ inputStreamMetadata,
       jobContext = this.jobContext,
       containerContext = this.containerContext,
       applicationContainerContextOption = Some(this.applicationContainerContext),