You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/02/28 23:24:42 UTC
[samza] branch master updated: SAMZA-2071: Skip StartpointManager
instantiation for PassThroughCoordinator. (#933)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new c49a218 SAMZA-2071: Skip StartpointManager instantiation for PassThroughCoordinator. (#933)
c49a218 is described below
commit c49a218f7126b6cb45d61551bf94c83fa195f56b
Author: shanthoosh <sv...@linkedin.com>
AuthorDate: Thu Feb 28 15:24:37 2019 -0800
SAMZA-2071: Skip StartpointManager instantiation for PassThroughCoordinator. (#933)
---
.../main/scala/org/apache/samza/container/SamzaContainer.scala | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 094f49a..f8d799c 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -426,18 +426,20 @@ object SamzaContainer extends Logging {
.orNull
info("Got checkpoint manager: %s" format checkpointManager)
- val metadataStoreFactory = Option(config.getStartpointMetadataStoreFactory)
+ val startpointMetadataStoreFactory = Option(config.getStartpointMetadataStoreFactory)
.map(Util.getObj(_, classOf[MetadataStoreFactory]))
.orNull
- val startpointManager = {
+ val startpointManager = if (startpointMetadataStoreFactory != null) {
try {
- Option(new StartpointManager(metadataStoreFactory, config, samzaContainerMetrics.registry))
+ Option(new StartpointManager(startpointMetadataStoreFactory, config, samzaContainerMetrics.registry))
} catch {
case e: Exception => {
error("Unable to get an instance of the StartpointManager. Continuing without one.", e)
None
}
}
+ } else {
+ None
}
// create a map of consumers with callbacks to pass to the OffsetManager