You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/21 18:09:54 UTC

samza git commit: SAMZA-1647: Fix NPE in onJobModelExpired handler in StreamProcessor.

Repository: samza
Updated Branches:
  refs/heads/master 72ad7523f -> 02153fa50


SAMZA-1647: Fix NPE in onJobModelExpired handler in StreamProcessor.

**Changes:**
* Switching to using explicit lock in StreamProcessor to make things simpler on state updation.
* Switch from using synchronized in ZkJobCoordinator to prevent any potential deadlocks
between two threads (where one thread holds the StreamProcessor and other thread has ZkJobCoordinator lock).
* Misc cleanups in StreamProcessor: Remove volatile qualifiers from state variables in StreamProcessor. Remove reinstantiating the
executorService in onNewJobModel.
* ZkJobCoordinator cleanups: Make some state variables as immutable.

**NOTE**: The classes in which these changes were made were aynonymous inner classes,
so to add proper unit tests we need to do big haul of refactor.

Author: Shanthoosh Venkataraman <sa...@gmail.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #493 from shanthoosh/fix_npe_in_jobmodel_expired_handler


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/02153fa5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/02153fa5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/02153fa5

Branch: refs/heads/master
Commit: 02153fa506e38b2e7f01c0374089e200bfe1e363
Parents: 72ad752
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Mon May 21 11:09:44 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Mon May 21 11:09:44 2018 -0700

----------------------------------------------------------------------
 .../apache/samza/processor/StreamProcessor.java | 216 ++++++++++---------
 .../org/apache/samza/zk/ZkJobCoordinator.java   |  84 ++++----
 2 files changed, 152 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/02153fa5/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 40deb1b..73f32e7 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -66,16 +66,16 @@ public class StreamProcessor {
   private final Config config;
   private final long taskShutdownMs;
   private final String processorId;
+  private final ExecutorService executorService;
+  private final Object lock = new Object();
 
-  private ExecutorService executorService;
-
-  private volatile SamzaContainer container = null;
-  private volatile Throwable containerException = null;
+  private SamzaContainer container = null;
+  private Throwable containerException = null;
+  private boolean processorOnStartCalled = false;
 
   // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
   // stopped due to re-balancing
   volatile CountDownLatch jcContainerShutdownLatch;
-  private volatile boolean processorOnStartCalled = false;
 
   @VisibleForTesting
   JobCoordinatorListener jobCoordinatorListener = null;
@@ -97,7 +97,7 @@ public class StreamProcessor {
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, (Object) asyncStreamTaskFactory, processorListener, null);
+    this(config, customMetricsReporters, asyncStreamTaskFactory, processorListener, null);
   }
 
   /**
@@ -110,7 +110,7 @@ public class StreamProcessor {
    */
   public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory, StreamProcessorLifecycleListener processorListener) {
-    this(config, customMetricsReporters, (Object) streamTaskFactory, processorListener, null);
+    this(config, customMetricsReporters, streamTaskFactory, processorListener, null);
   }
 
   /* package private */
@@ -134,8 +134,9 @@ public class StreamProcessor {
     this.jobCoordinator = (jobCoordinator != null) ? jobCoordinator : getJobCoordinator();
     this.jobCoordinatorListener = createJobCoordinatorListener();
     this.jobCoordinator.setListener(jobCoordinatorListener);
-
-    processorId = this.jobCoordinator.getProcessorId();
+    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).setDaemon(true).build();
+    this.executorService = Executors.newSingleThreadExecutor(threadFactory);
+    this.processorId = this.jobCoordinator.getProcessorId();
   }
 
   /**
@@ -175,32 +176,28 @@ public class StreamProcessor {
    * If container is not running, then this method will simply shutdown the {@link JobCoordinator}.
    *
    */
-  public synchronized void stop() {
-    boolean containerShutdownInvoked = false;
-    if (container != null) {
-      try {
-        LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
-        container.shutdown();
-        LOGGER.info("Waiting {} milliseconds for the container: {} to shutdown.", taskShutdownMs, container);
-        containerShutdownInvoked = true;
-      } catch (Exception exception) {
-        LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
+  public void stop() {
+    synchronized (lock) {
+      boolean containerShutdownInvoked = false;
+      if (container != null) {
+        try {
+          LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
+          container.shutdown();
+          containerShutdownInvoked = true;
+        } catch (Exception exception) {
+          LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
+        }
       }
-    }
 
-    if (!containerShutdownInvoked) {
-      LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
-      jobCoordinator.stop();
+      if (!containerShutdownInvoked) {
+        LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
+        jobCoordinator.stop();
+      }
     }
   }
 
   SamzaContainer createSamzaContainer(String processorId, JobModel jobModel) {
-    return SamzaContainer.apply(
-        processorId,
-        jobModel,
-        config,
-        ScalaJavaUtil.toScalaMap(customMetricsReporter),
-        taskFactory);
+    return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
   }
 
   JobCoordinatorListener createJobCoordinatorListener() {
@@ -208,91 +205,52 @@ public class StreamProcessor {
 
       @Override
       public void onJobModelExpired() {
-        if (container != null) {
-          SamzaContainerStatus status = container.getStatus();
-          if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
-            boolean shutdownComplete = false;
-            try {
-              LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
-              container.pause();
-              shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
-              LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
-            } catch (IllegalContainerStateException icse) {
-              // Ignored since container is not running
-              LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
-              shutdownComplete = true;
-            } catch (InterruptedException e) {
-              Thread.currentThread().interrupt();
-              LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
-            }
-            if (!shutdownComplete) {
-              LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
-              container = null;
-              stop();
+        synchronized (lock) {
+          if (container != null) {
+            SamzaContainerStatus status = container.getStatus();
+            if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
+              boolean shutdownComplete = false;
+              try {
+                LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container,
+                    processorId);
+                container.pause();
+                shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+                LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
+              } catch (IllegalContainerStateException icse) {
+                // Ignored since container is not running
+                LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
+                shutdownComplete = true;
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
+              } catch (Exception e) {
+                LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
+              }
+              if (!shutdownComplete) {
+                LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
+                container = null;
+                stop();
+              } else {
+                LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
+              }
             } else {
-              LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
+              LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
             }
           } else {
-            LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
+            LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
           }
-        } else {
-          LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
         }
       }
 
       @Override
       public void onNewJobModel(String processorId, JobModel jobModel) {
-        jcContainerShutdownLatch = new CountDownLatch(1);
-
-        SamzaContainerListener containerListener = new SamzaContainerListener() {
-          @Override
-          public void onContainerStart() {
-            if (!processorOnStartCalled) {
-              // processorListener is called on start only the first time the container starts.
-              // It is not called after every re-balance of partitions among the processors
-              processorOnStartCalled = true;
-              if (processorListener != null) {
-                processorListener.onStart();
-              }
-            } else {
-              LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
-            }
-          }
-
-          @Override
-          public void onContainerStop(boolean pauseByJm) {
-            if (pauseByJm) {
-              LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
-              if (jcContainerShutdownLatch != null) {
-                jcContainerShutdownLatch.countDown();
-              }
-            } else {  // sp.stop was called or container stopped by itself
-              LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
-              container = null; // this guarantees that stop() doesn't try to stop container again
-              stop();
-            }
-          }
-
-          @Override
-          public void onContainerFailed(Throwable t) {
-            if (jcContainerShutdownLatch != null) {
-              jcContainerShutdownLatch.countDown();
-            } else {
-              LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
-            }
-            containerException = t;
-            LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
-            container = null;
-            stop();
-          }
-        };
-
-        container = createSamzaContainer(processorId, jobModel);
-        container.setContainerListener(containerListener);
-        LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
-        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(CONTAINER_THREAD_NAME_FORMAT).build();
-        executorService = Executors.newSingleThreadExecutor(threadFactory);
-        executorService.submit(container::run);
+        synchronized (lock) {
+          jcContainerShutdownLatch = new CountDownLatch(1);
+          container = createSamzaContainer(processorId, jobModel);
+          container.setContainerListener(new ContainerListener());
+          LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
+          executorService.submit(container::run);
+        }
       }
 
       @Override
@@ -324,4 +282,52 @@ public class StreamProcessor {
   SamzaContainer getContainer() {
     return container;
   }
+
+  class ContainerListener implements SamzaContainerListener {
+
+    @Override
+    public void onContainerStart() {
+      if (!processorOnStartCalled) {
+        // processorListener is called on start only the first time the container starts.
+        // It is not called after every re-balance of partitions among the processors
+        processorOnStartCalled = true;
+        if (processorListener != null) {
+          processorListener.onStart();
+        }
+      } else {
+        LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
+      }
+    }
+
+    @Override
+    public void onContainerStop(boolean pauseByJm) {
+      if (pauseByJm) {
+        LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
+        if (jcContainerShutdownLatch != null) {
+          jcContainerShutdownLatch.countDown();
+        }
+      } else {  // sp.stop was called or container stopped by itself
+        LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
+        synchronized (lock) {
+          container = null; // this guarantees that stop() doesn't try to stop container again
+          stop();
+        }
+      }
+    }
+
+    @Override
+    public void onContainerFailed(Throwable t) {
+      if (jcContainerShutdownLatch != null) {
+        jcContainerShutdownLatch.countDown();
+      } else {
+        LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+      }
+      synchronized (lock) {
+        containerException = t;
+        LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
+        container = null;
+        stop();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/02153fa5/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 3f16f2b..74abf55 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.checkpoint.CheckpointManager;
@@ -58,8 +59,6 @@ import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.zookeeper.Watcher.Event.KeeperState.*;
-
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
@@ -92,19 +91,19 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final ZkJobCoordinatorMetrics metrics;
   private final Map<String, MetricsReporter> reporters;
   private final ZkLeaderElector leaderElector;
+  private final AtomicBoolean initiatedShutdown = new AtomicBoolean(false);
+  private final StreamMetadataCache streamMetadataCache;
+  private final SystemAdmins systemAdmins;
+  private final int debounceTimeMs;
+  private final Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
 
-  private StreamMetadataCache streamMetadataCache = null;
-  private SystemAdmins systemAdmins = null;
-
-  @VisibleForTesting
-  ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
-  private int debounceTimeMs;
   private boolean hasCreatedStreams = false;
-  private boolean initiatedShutdown = false;
   private String cachedJobModelVersion = null;
-  private Map<TaskName, Integer> changeLogPartitionMap = new HashMap<>();
+
+  @VisibleForTesting
+  ScheduleAfterDebounceTime debounceTimer;
 
   ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
@@ -142,50 +141,49 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   }
 
   @Override
-  public synchronized void stop() {
+  public void stop() {
     // Make the shutdown idempotent
-    if (initiatedShutdown) {
-      LOG.debug("Job Coordinator shutdown is already in progress!");
-      return;
-    }
+    if (initiatedShutdown.compareAndSet(false, true)) {
 
-    LOG.info("Shutting down Job Coordinator...");
-    initiatedShutdown = true;
-    boolean shutdownSuccessful = false;
+      LOG.info("Shutting down JobCoordinator.");
+      boolean shutdownSuccessful = false;
 
-    // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that
-    // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader.
-    metrics.isLeader.set(false);
+      // Notify the metrics about abandoning the leadership. Moving it up the chain in the shutdown sequence so that
+      // in case of unclean shutdown, we get notified about lack of leader and we can set up some alerts around the absence of leader.
+      metrics.isLeader.set(false);
 
-    try {
-      // todo: what does it mean for coordinator listener to be null? why not have it part of constructor?
-      if (coordinatorListener != null) {
-        coordinatorListener.onJobModelExpired();
-      }
+      try {
+        // todo: what does it mean for coordinator listener to be null? why not have it part of constructor?
+        if (coordinatorListener != null) {
+          coordinatorListener.onJobModelExpired();
+        }
 
-      debounceTimer.stopScheduler();
+        debounceTimer.stopScheduler();
 
-      LOG.debug("Shutting down ZkController.");
-      zkController.stop();
+        LOG.debug("Shutting down ZkController.");
+        zkController.stop();
 
-      LOG.debug("Shutting down system admins.");
-      systemAdmins.stop();
+        LOG.debug("Shutting down system admins.");
+        systemAdmins.stop();
 
-      LOG.debug("Shutting down metrics.");
-      shutdownMetrics();
+        LOG.debug("Shutting down metrics.");
+        shutdownMetrics();
 
-      if (coordinatorListener != null) {
-        coordinatorListener.onCoordinatorStop();
-      }
+        if (coordinatorListener != null) {
+          coordinatorListener.onCoordinatorStop();
+        }
 
-      shutdownSuccessful = true;
-    } catch (Throwable t) {
-      LOG.error("Encountered errors during job coordinator stop.", t);
-      if (coordinatorListener != null) {
-        coordinatorListener.onCoordinatorFailure(t);
+        shutdownSuccessful = true;
+      } catch (Throwable t) {
+        LOG.error("Encountered errors during job coordinator stop.", t);
+        if (coordinatorListener != null) {
+          coordinatorListener.onCoordinatorFailure(t);
+        }
+      } finally {
+        LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful);
       }
-    } finally {
-      LOG.info("Job Coordinator shutdown finished with ShutdownComplete=" + shutdownSuccessful);
+    } else {
+      LOG.info("Job Coordinator shutdown is in progress!");
     }
   }