You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/21 18:09:54 UTC
samza git commit: SAMZA-1647: Fix NPE in onJobModelExpired handler in
StreamProcessor.
Repository: samza
Updated Branches:
refs/heads/master 72ad7523f -> 02153fa50
SAMZA-1647: Fix NPE in onJobModelExpired handler in StreamProcessor.
**Changes:**
* Switching to using explicit lock in StreamProcessor to make things simpler on state updation.
* Switch from using synchronized in ZkJobCoordinator to prevent any potential deadlocks
between two threads (where one thread holds the StreamProcessor and other thread has ZkJobCoordinator lock).
* Misc cleanups in StreamProcessor: Remove volatile qualifiers from state variables in StreamProcessor. Remove reinstantiating the
executorService in onNewJobModel.
* ZkJobCoordinator cleanups: Make some state variables as immutable.
**NOTE**: The classes in which these changes were made were aynonymous inner classes,
so to add proper unit tests we need to do big haul of refactor.
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Reviewers: Jagadish <ja...@apache.org>
Closes #493 from shanthoosh/fix_npe_in_jobmodel_expired_handler
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/02153fa5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/02153fa5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/02153fa5
Branch: refs/heads/master
Commit: 02153fa506e38b2e7f01c0374089e200bfe1e363
Parents: 72ad752
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Mon May 21 11:09:44 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon May 21 11:09:44 2018 -0700
----------------------------------------------------------------------
.../apache/samza/processor/StreamProcessor.java | 216 ++++++++++---------
.../org/apache/samza/zk/ZkJobCoordinator.java | 84 ++++----
2 files changed, 152 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/02153fa5/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 40deb1b..73f32e7 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -66,16 +66,16 @@ public class StreamProcessor {
private final Config config;
private final long taskShutdownMs;
private final String processorId;
+ private final ExecutorService executorService;
+ private final Object lock = new Object();
- private ExecutorService executorService;
-
- private volatile SamzaContainer container = null;
- private volatile Throwable containerException = null;
+ private SamzaContainer container = null;
+ private Throwable containerException = null;
+ private boolean processorOnStartCalled = false;
// Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
// stopped due to re-balancing
volatile CountDownLatch jcContainerShutdownLatch;
- private volatile boolean processorOnStartCalled = false;
@VisibleForTesting
JobCoordinatorListener jobCoordinatorListener = null;
@@ -97,7 +97,7 @@ public class StreamProcessor {
*/
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
- this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener, null);
+ this(config, customMetricsReporters, asyncStreamTaskFactory, processorListener, null);
}
/**
@@ -110,7 +110,7 @@ public class StreamProcessor {
*/
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) {
- this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
+ this(config, customMetricsReporters, streamTaskFactory, processorListener, null);
}
/* package private */
@@ -134,8 +134,9 @@ public class StreamProcessor {
this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator();
this.jobCoordinatorListener = createJobCoordinatorListener();
this.jobCoordinator.setListener(jobCoordinatorListener);
-
- processorId = this.jobCoordinator.getProcessorId();
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
+ this.executorService = Executors.newSingleThreadExecutor(threadFactory);
+ this.processorId = this.jobCoordinator.getProcessorId();
}
/**
@@ -175,32 +176,28 @@ public class StreamProcessor {
* If container is not running, then this method will simply shutdown the {@link JobCoordinator}.
*
*/
- public synchronized void stop() {
- boolean containerShutdownInvoked = false;
- if (container != null) {
- try {
- LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
- container.shutdown();
- LOGGER.info("Waiting {} milliseconds for the container: {} to shutdown.", taskShutdownMs, container);
- containerShutdownInvoked = true;
- } catch (Exception exception) {
- LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
+ public void stop() {
+ synchronized (lock) {
+ boolean containerShutdownInvoked = false;
+ if (container != null) {
+ try {
+ LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
+ container.shutdown();
+ containerShutdownInvoked = true;
+ } catch (Exception exception) {
+ LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
+ }
}
- }
- if (!containerShutdownInvoked) {
- LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
- jobCoordinator.stop();
+ if (!containerShutdownInvoked) {
+ LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
+ jobCoordinator.stop();
+ }
}
}
SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
- return SamzaContainer.apply(
- processorId,
- jobModel,
- config,
- ScalaJavaUtil.toScalaMap(customMetricsReporter),
- taskFactory);
+ return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
}
JobCoordinatorListener createJobCoordinatorListener() {
@@ -208,91 +205,52 @@ public class StreamProcessor {
@Override
public void onJobModelExpired() {
- if (container != null) {
- SamzaContainerStatus status = container.getStatus();
- if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
- boolean shutdownComplete = false;
- try {
- LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
- container.pause();
- shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
- LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
- } catch (IllegalContainerStateException icse) {
- // Ignored since container is not running
- LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
- shutdownComplete = true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
- }
- if (!shutdownComplete) {
- LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
- container = null;
- stop();
+ synchronized (lock) {
+ if (container != null) {
+ SamzaContainerStatus status = container.getStatus();
+ if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
+ boolean shutdownComplete = false;
+ try {
+ LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container,
+ processorId);
+ container.pause();
+ shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+ LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
+ } catch (IllegalContainerStateException icse) {
+ // Ignored since container is not running
+ LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
+ shutdownComplete = true;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
+ }
+ if (!shutdownComplete) {
+ LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
+ container = null;
+ stop();
+ } else {
+ LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
+ }
} else {
- LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
+ LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
}
} else {
- LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
+ LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
}
- } else {
- LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
}
}
@Override
public void onNewJobModel(String processorId, JobModel jobModel) {
- jcContainerShutdownLatch = new CountDownLatch(1);
-
- SamzaContainerListener containerListener = new SamzaContainerListener() {
- @Override
- public void onContainerStart() {
- if (!processorOnStartCalled) {
- // processorListener is called on start only the first time the container starts.
- // It is not called after every re-balance of partitions among the processors
- processorOnStartCalled = true;
- if (processorListener != null) {
- processorListener.onStart();
- }
- } else {
- LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
- }
- }
-
- @Override
- public void onContainerStop(boolean pauseByJm) {
- if (pauseByJm) {
- LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
- if (jcContainerShutdownLatch != null) {
- jcContainerShutdownLatch.countDown();
- }
- } else { // sp.stop was called or container stopped by itself
- LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
- container = null; // this guarantees that stop() doesn't try to stop container again
- stop();
- }
- }
-
- @Override
- public void onContainerFailed(Throwable t) {
- if (jcContainerShutdownLatch != null) {
- jcContainerShutdownLatch.countDown();
- } else {
- LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
- }
- containerException = t;
- LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
- container = null;
- stop();
- }
- };
-
- container = createSamzaContainer(processorId, jobModel);
- container.setContainerListener(containerListener);
- LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).build();
- executorService = Executors.newSingleThreadExecutor(threadFactory);
- executorService.submit(container::run);
+ synchronized (lock) {
+ jcContainerShutdownLatch = new CountDownLatch(1);
+ container = createSamzaContainer(processorId, jobModel);
+ container.setContainerListener(new ContainerListener());
+ LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
+ executorService.submit(container::run);
+ }
}
@Override
@@ -324,4 +282,52 @@ public class StreamProcessor {
SamzaContainer getContainer() {
return container;
}
+
+ class ContainerListener implements SamzaContainerListener {
+
+ @Override
+ public void onContainerStart() {
+ if (!processorOnStartCalled) {
+ // processorListener is called on start only the first time the container starts.
+ // It is not called after every re-balance of partitions among the processors
+ processorOnStartCalled = true;
+ if (processorListener != null) {
+ processorListener.onStart();
+ }
+ } else {
+ LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
+ }
+ }
+
+ @Override
+ public void onContainerStop(boolean pauseByJm) {
+ if (pauseByJm) {
+ LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
+ if (jcContainerShutdownLatch != null) {
+ jcContainerShutdownLatch.countDown();
+ }
+ } else { // sp.stop was called or container stopped by itself
+ LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
+ synchronized (lock) {
+ container = null; // this guarantees that stop() doesn't try to stop container again
+ stop();
+ }
+ }
+ }
+
+ @Override
+ public void onContainerFailed(Throwable t) {
+ if (jcContainerShutdownLatch != null) {
+ jcContainerShutdownLatch.countDown();
+ } else {
+ LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+ }
+ synchronized (lock) {
+ containerException = t;
+ LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
+ container = null;
+ stop();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/02153fa5/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 3f16f2b..74abf55 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.I0Itec.zkclient.IZkStateListener;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.checkpoint.CheckpointManager;
@@ -58,8 +59,6 @@ import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.zookeeper.Watcher.Event.KeeperState.*;
-
/**
* JobCoordinator for stand alone processor managed via Zookeeper.
*/
@@ -92,19 +91,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private final ZkJobCoordinatorMetrics metrics;
private final Map<String, MetricsReporter> reporters;
private final ZkLeaderElector leaderElector;
+ private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
+ private final StreamMetadataCache streamMetadataCache;
+ private final SystemAdmins systemAdmins;
+ private final int debounceTimeMs;
+ private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
- private StreamMetadataCache streamMetadataCache = null;
- private SystemAdmins systemAdmins = null;
-
- @VisibleForTesting
- ScheduleAfterDebounceTime debounceTimer = null;
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
- private int debounceTimeMs;
private boolean hasCreatedStreams = false;
- private boolean initiatedShutdown = false;
private String cachedJobModelVersion = null;
- private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
+
+ @VisibleForTesting
+ ScheduleAfterDebounceTime debounceTimer;
ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
@@ -142,50 +141,49 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
}
@Override
- public synchronized void stop() {
+ public void stop() {
// Make the shutdown idempotent
- if (initiatedShutdown) {
- LOG.debug("Job Coordinator shutdown is already in progress!");
- return;
- }
+ if (initiatedShutdown.compareAndSet(false, true)) {
- LOG.info("Shutting down Job Coordinator...");
- initiatedShutdown = true;
- boolean shutdownSuccessful = false;
+ LOG.info("Shutting down JobCoordinator.");
+ boolean shutdownSuccessful = false;
- // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that
- // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader.
- metrics.isLeader.set(false);
+ // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that
+ // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader.
+ metrics.isLeader.set(false);
- try {
- // todo: what does it mean for coordinator listener to be null? why not have it part of constructor?
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
- }
+ try {
+ // todo: what does it mean for coordinator listener to be null? why not have it part of constructor?
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
- debounceTimer.stopScheduler();
+ debounceTimer.stopScheduler();
- LOG.debug("Shutting down ZkController.");
- zkController.stop();
+ LOG.debug("Shutting down ZkController.");
+ zkController.stop();
- LOG.debug("Shutting down system admins.");
- systemAdmins.stop();
+ LOG.debug("Shutting down system admins.");
+ systemAdmins.stop();
- LOG.debug("Shutting down metrics.");
- shutdownMetrics();
+ LOG.debug("Shutting down metrics.");
+ shutdownMetrics();
- if (coordinatorListener != null) {
- coordinatorListener.onCoordinatorStop();
- }
+ if (coordinatorListener != null) {
+ coordinatorListener.onCoordinatorStop();
+ }
- shutdownSuccessful = true;
- } catch (Throwable t) {
- LOG.error("Encountered errors during job coordinator stop.", t);
- if (coordinatorListener != null) {
- coordinatorListener.onCoordinatorFailure(t);
+ shutdownSuccessful = true;
+ } catch (Throwable t) {
+ LOG.error("Encountered errors during job coordinator stop.", t);
+ if (coordinatorListener != null) {
+ coordinatorListener.onCoordinatorFailure(t);
+ }
+ } finally {
+ LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful);
}
- } finally {
- LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful);
+ } else {
+ LOG.info("Job Coordinator shutdown is in progress!");
}
}