You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:54 UTC
[08/23] samza git commit: Fix log messages from
StreamProcessor(onJobModelExpired event).
Fix log messages from StreamProcessor(onJobModelExpired event).
Log messages published in onJobModelExpired event have `processorId` as null. `processorId` is cached as final var in jobCoordinatorListener method. JLS for final fields/variables states that they're initialized before the constructor. This sets local final variable copy as null(since it relies upon value of instance variable to be set in constructor).
Changes
* Use processorId directly in `createCoordinatorListener` method.
* Remove StreamProcessor.toString since it has no usages.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #249 from shanthoosh/fix_logging_in_stream_processor
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/91b22fd7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/91b22fd7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/91b22fd7
Branch: refs/heads/0.14.0
Commit: 91b22fd773e9e24b77a1f29bde2ba4fa64e5a82a
Parents: 1c11393
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Jul 25 10:35:31 2017 -0700
Committer: navina <na...@apache.org>
Committed: Tue Jul 25 10:35:31 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/processor/StreamProcessor.java | 10 ++--------
1 file changed, 2 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/91b22fd7/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 89edd16..590fa11 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -111,11 +111,6 @@ public class StreamProcessor {
this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
}
- @Override
- public String toString() {
- return "Processor:" + processorId;
- }
-
/* package private */
JobCoordinator getJobCoordinator() {
return Util.
@@ -210,7 +205,6 @@ public class StreamProcessor {
}
JobCoordinatorListener createJobCoordinatorListener() {
- final String pid = this.toString();
return new JobCoordinatorListener() {
@Override
@@ -220,7 +214,7 @@ public class StreamProcessor {
if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
boolean shutdownComplete = false;
try {
- LOGGER.info("Shutting down container in onJobModelExpired for processor:" + pid);
+ LOGGER.info("Shutting down container in onJobModelExpired for processor:" + processorId);
container.pause();
shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
LOGGER.info("ShutdownComplete=" + shutdownComplete);
@@ -231,7 +225,7 @@ public class StreamProcessor {
} catch (InterruptedException e) {
LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e);
}
- LOGGER.info("Shutting down container done for pid=" + pid + "; complete =" + shutdownComplete);
+ LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete);
if (!shutdownComplete) {
LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " +
"Stopping the processor.");