You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/07/31 00:12:18 UTC
[samza] branch master updated: Remove the duplicate fetch system
stream metadata API invocations from SamzaContainer startup sequence.
(#1119)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ef077cc Remove the duplicate fetch system stream metadata API invocations from SamzaContainer startup sequence. (#1119)
ef077cc is described below
commit ef077ccc5d06eb59063692f1d30423b67be720a1
Author: shanthoosh <sp...@usc.edu>
AuthorDate: Tue Jul 30 17:12:13 2019 -0700
Remove the duplicate fetch system stream metadata API invocations from SamzaContainer startup sequence. (#1119)
---
.../apache/samza/container/SamzaContainer.scala | 1 +
.../org/apache/samza/container/TaskInstance.scala | 34 ++++++++++------------
.../org/apache/samza/container/TestRunLoop.java | 1 +
.../apache/samza/container/TestTaskInstance.scala | 2 ++
4 files changed, 20 insertions(+), 18 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index db8dbae..79b16b4 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -579,6 +579,7 @@ object SamzaContainer extends Logging {
exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics.get(taskName).get, taskConfig),
jobModel = jobModel,
streamMetadataCache = streamMetadataCache,
+ inputStreamMetadata = inputStreamMetadata,
timerExecutor = timerExecutor,
jobContext = jobContext,
containerContext = containerContext,
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 22879fa..a17a790 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -22,8 +22,6 @@ package org.apache.samza.container
import java.util.{Objects, Optional}
import java.util.concurrent.ScheduledExecutorService
-
-import org.apache.commons.lang3.StringUtils
import org.apache.samza.SamzaException
import org.apache.samza.checkpoint.OffsetManager
import org.apache.samza.config.Config
@@ -31,7 +29,6 @@ import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.context._
import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.scheduler.{CallbackSchedulerImpl, EpochTimeScheduler, ScheduledCallback}
-import org.apache.samza.startpoint.Startpoint
import org.apache.samza.storage.kv.KeyValueStore
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system._
@@ -57,6 +54,7 @@ class TaskInstance(
val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
jobModel: JobModel = null,
streamMetadataCache: StreamMetadataCache = null,
+ inputStreamMetadata: Map[SystemStream, SystemStreamMetadata] = Map(),
timerExecutor : ScheduledExecutorService = null,
jobContext: JobContext,
containerContext: ContainerContext,
@@ -326,22 +324,22 @@ class TaskInstance(
* Check each partition assigned to the task is caught to the last offset
*/
def initCaughtUpMapping() {
- if (taskContext.getStreamMetadataCache != null) {
+ if (inputStreamMetadata != null && inputStreamMetadata.nonEmpty) {
systemStreamPartitions.foreach(ssp => {
- val partitionMetadata = taskContext
- .getStreamMetadataCache
- .getSystemStreamMetadata(ssp.getSystemStream, false)
- .getSystemStreamPartitionMetadata.get(ssp.getPartition)
-
- val upcomingOffset = partitionMetadata.getUpcomingOffset
- val startingOffset = offsetManager.getStartingOffset(taskName, ssp)
- .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format ssp))
-
- // Mark ssp to be caught up if the starting offset is already the
- // upcoming offset, meaning the task has consumed all the messages
- // in this partition before and waiting for the future incoming messages.
- if(Objects.equals(upcomingOffset, startingOffset)) {
- ssp2CaughtupMapping(ssp) = true
+ if (inputStreamMetadata.contains(ssp.getSystemStream)) {
+ val partitionMetadata = inputStreamMetadata(ssp.getSystemStream)
+ .getSystemStreamPartitionMetadata.get(ssp.getPartition)
+
+ val upcomingOffset = partitionMetadata.getUpcomingOffset
+ val startingOffset = offsetManager.getStartingOffset(taskName, ssp)
+ .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format ssp))
+
+ // Mark ssp to be caught up if the starting offset is already the
+ // upcoming offset, meaning the task has consumed all the messages
+ // in this partition before and waiting for the future incoming messages.
+ if(Objects.equals(upcomingOffset, startingOffset)) {
+ ssp2CaughtupMapping(ssp) = true
+ }
}
})
}
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index 41e55ed..4556679 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -108,6 +108,7 @@ public class TestRunLoop {
null,
null,
null,
+ null,
mock(JobContext.class),
mock(ContainerContext.class),
Option.apply(null),
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 921e91b..0005ccd 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -271,6 +271,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
.thenReturn(Collections.singletonMap(new Partition(0), sspMetadata))
val ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+ val inputStreamMetadata = collection.Map(ssp.getSystemStream -> systemStreamMetadata)
val taskInstance = new TaskInstance(this.task,
this.taskModel,
@@ -284,6 +285,7 @@ class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
systemStreamPartitions = Set(ssp),
exceptionHandler = this.taskInstanceExceptionHandler,
streamMetadataCache = cacheMock,
+ inputStreamMetadata = Map.empty ++ inputStreamMetadata,
jobContext = this.jobContext,
containerContext = this.containerContext,
applicationContainerContextOption = Some(this.applicationContainerContext),