You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/02/28 23:24:42 UTC

[samza] branch master updated: SAMZA-2071: Skip StartpointManager instantiation for PassThroughCoordinator. (#933)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new c49a218  SAMZA-2071: Skip StartpointManager instantiation for PassThroughCoordinator. (#933)
c49a218 is described below

commit c49a218f7126b6cb45d61551bf94c83fa195f56b
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Thu Feb 28 15:24:37 2019 -0800

    SAMZA-2071: Skip StartpointManager instantiation for PassThroughCoordinator. (#933)
---
 .../main/scala/org/apache/samza/container/SamzaContainer.scala    | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 094f49a..f8d799c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -426,18 +426,20 @@ object SamzaContainer extends Logging {
       .orNull
     info("Got checkpoint manager: %s" format checkpointManager)
 
-    val metadataStoreFactory = Option(config.getStartpointMetadataStoreFactory)
+    val startpointMetadataStoreFactory = Option(config.getStartpointMetadataStoreFactory)
       .map(Util.getObj(_, classOf[MetadataStoreFactory]))
       .orNull
-    val startpointManager = {
+    val startpointManager = if (startpointMetadataStoreFactory != null) {
       try {
-        Option(new StartpointManager(metadataStoreFactory, config, samzaContainerMetrics.registry))
+        Option(new StartpointManager(startpointMetadataStoreFactory, config, samzaContainerMetrics.registry))
       } catch {
         case e: Exception => {
           error("Unable to get an instance of the StartpointManager. Continuing without one.", e)
           None
         }
       }
+    } else {
+      None
     }
 
     // create a map of consumers with callbacks to pass to the OffsetManager