You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/04/12 21:34:48 UTC
samza git commit: SAMZA-1584: Improve logging in StreamProcessor.
Repository: samza
Updated Branches:
refs/heads/master 3895a9070 -> 76de840c7
SAMZA-1584: Improve logging in StreamProcessor.
Add the processorID in the log lines wherever necessary(since we support running multiple stream applications in a JVM) and improving logging in general in StreamProcessor.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Author: Shanthoosh Venkataraman <sv...@lm-lsnscdw5132.linkedin.biz>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #441 from shanthoosh/SAMZA-1584
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/76de840c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/76de840c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/76de840c
Branch: refs/heads/master
Commit: 76de840c734fe2b7987af22d1ba6133437c25a5e
Parents: 3895a90
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Thu Apr 12 14:34:45 2018 -0700
Committer: Prateek Maheshwari <pm...@linkedin.com>
Committed: Thu Apr 12 14:34:45 2018 -0700
----------------------------------------------------------------------
.../apache/samza/processor/StreamProcessor.java | 54 ++++++++++----------
1 file changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/76de840c/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index b548200..8dacc6c 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaContainerStatus;
import org.apache.samza.annotation.InterfaceStability;
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
@InterfaceStability.Evolving
public class StreamProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(StreamProcessor.class);
+ private static final String CONTAINER_THREAD_NAME_FORMAT = "Samza StreamProcessor Container Thread-%d";
private final JobCoordinator jobCoordinator;
private final StreamProcessorLifecycleListener processorListener;
@@ -71,7 +73,7 @@ public class StreamProcessor {
// Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
// stopped due to re-balancing
- /* package private */volatile CountDownLatch jcContainerShutdownLatch;
+ volatile CountDownLatch jcContainerShutdownLatch;
private volatile boolean processorOnStartCalled = false;
@VisibleForTesting
@@ -179,11 +181,12 @@ public class StreamProcessor {
boolean containerShutdownInvoked = false;
if (container != null) {
try {
- LOGGER.info("Shutting down container " + container.toString() + " from StreamProcessor");
+ LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
container.shutdown();
+ LOGGER.info("Waiting {} milliseconds for the container: {} to shutdown.", taskShutdownMs, container);
containerShutdownInvoked = true;
- } catch (IllegalContainerStateException icse) {
- LOGGER.info("Container was not running", icse);
+ } catch (Exception exception) {
+ LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
}
}
@@ -191,7 +194,6 @@ public class StreamProcessor {
LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
jobCoordinator.stop();
}
-
}
SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
@@ -199,7 +201,7 @@ public class StreamProcessor {
processorId,
jobModel,
config,
- Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter),
+ Util.javaMapAsScalaMap(customMetricsReporter),
taskFactory);
}
@@ -213,32 +215,30 @@ public class StreamProcessor {
if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
boolean shutdownComplete = false;
try {
- LOGGER.info("Shutting down container in onJobModelExpired for processor:" + processorId);
+ LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
container.pause();
shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
- LOGGER.info("ShutdownComplete=" + shutdownComplete);
+ LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
} catch (IllegalContainerStateException icse) {
// Ignored since container is not running
- LOGGER.info("Container was not running.", icse);
+ LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
shutdownComplete = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e);
+ LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
}
- LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete);
if (!shutdownComplete) {
- LOGGER.warn("Container " + container.toString() + " may not have shutdown successfully. " +
- "Stopping the processor.");
+ LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
container = null;
stop();
} else {
- LOGGER.debug("Container " + container.toString() + " shutdown successfully");
+ LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
}
} else {
- LOGGER.debug("Container " + container.toString() + " is not running.");
+ LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
}
} else {
- LOGGER.debug("Container is not instantiated yet.");
+ LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
}
}
@@ -257,19 +257,19 @@ public class StreamProcessor {
processorListener.onStart();
}
} else {
- LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
+ LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
}
}
@Override
public void onContainerStop(boolean pauseByJm) {
if (pauseByJm) {
- LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
+ LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
if (jcContainerShutdownLatch != null) {
jcContainerShutdownLatch.countDown();
}
} else { // sp.stop was called or container stopped by itself
- LOGGER.info("Container " + container.toString() + " stopped.");
+ LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
container = null; // this guarantees that stop() doesn't try to stop container again
stop();
}
@@ -283,7 +283,7 @@ public class StreamProcessor {
LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
}
containerException = t;
- LOGGER.error("Container failed. Stopping the processor.", containerException);
+ LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
container = null;
stop();
}
@@ -291,16 +291,16 @@ public class StreamProcessor {
container = createSamzaContainer(processorId, jobModel);
container.setContainerListener(containerListener);
- LOGGER.info("Starting container " + container.toString());
- executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+ LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).build();
+ executorService = Executors.newSingleThreadExecutor(threadFactory);
executorService.submit(container::run);
}
@Override
public void onCoordinatorStop() {
if (executorService != null) {
- LOGGER.info("Shutting down the executor service.");
+ LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
executorService.shutdownNow();
}
if (processorListener != null) {
@@ -312,11 +312,11 @@ public class StreamProcessor {
}
@Override
- public void onCoordinatorFailure(Throwable e) {
- LOGGER.info("Coordinator Failed. Stopping the processor.");
+ public void onCoordinatorFailure(Throwable throwable) {
+ LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
stop();
if (processorListener != null) {
- processorListener.onFailure(e);
+ processorListener.onFailure(throwable);
}
}
};