You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/09 00:59:06 UTC

samza git commit: SAMZA-1150 : Handling Error propagation between ZkJobCoordinator & DebounceTimer

Repository: samza
Updated Branches:
  refs/heads/master 28afae09d -> 2d47ee804


SAMZA-1150 : Handling Error propagation between ZkJobCoordinator & DebounceTimer

This PR depends on PR #153
* Treats all errors in jobcoordinator as FATAL and shuts-down the streamprocessor
* [Bug] Fixed bug reported in SAMZA-1241
* Introduced a callback to be associated with the timer (same callback for every Runnable failure)

**TBD**: some more unit tests

Author: Navina Ramesh <na...@apache.org>

Reviewers: Xinyu Liu <xi...@apache.org>, Prateek Maheshwari <pm...@linkedin.com>

Closes #166 from navina/SAMZA-1150


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2d47ee80
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2d47ee80
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2d47ee80

Branch: refs/heads/master
Commit: 2d47ee8048d71466b1da695832d009f9dfea0b15
Parents: 28afae0
Author: Navina Ramesh <na...@apache.org>
Authored: Mon May 8 17:58:55 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Mon May 8 17:58:55 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ScheduleAfterDebounceTime.java     | 40 ++++++++--
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 79 ++++++++------------
 .../samza/system/StreamMetadataCache.scala      | 20 ++++-
 .../samza/zk/TestScheduleAfterDebounceTime.java | 23 +++++-
 4 files changed, 104 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 21572f5..5cfd37a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * This class allows scheduling a Runnable actions after some debounce time.
  * When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * ZK based standalone app.
  */
 public class ScheduleAfterDebounceTime {
-  public static final Logger LOGGER = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+  public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
   public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
 
   // Here we predefine some actions which are used in the ZK based standalone app.
@@ -51,29 +50,54 @@ public class ScheduleAfterDebounceTime {
 
   public static final int DEBOUNCE_TIME_MS = 2000;
 
+  private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
+
   private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
       new ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build());
   private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
 
+  // Ideally, this should be only used for testing. But ZkBarrierForVersionUpgrades uses it. This needs to be fixed.
+  // TODO: Timer shouldn't be passed around the components. It should be associated with the JC or the caller of
+  // coordinationUtils.
+  public ScheduleAfterDebounceTime() {
+    this.scheduledTaskFailureCallback = null;
+  }
+
+  public ScheduleAfterDebounceTime(ScheduledTaskFailureCallback errorScheduledTaskFailureCallback) {
+    this.scheduledTaskFailureCallback = errorScheduledTaskFailureCallback;
+  }
+
   synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
     // check if this action has been scheduled already
     ScheduledFuture sf = futureHandles.get(actionName);
     if (sf != null && !sf.isDone()) {
-      LOGGER.info("cancel future for " + actionName);
+      LOG.info("cancel future for " + actionName);
       // attempt to cancel
       if (!sf.cancel(false)) {
         try {
           sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
           // we ignore the exception
-          LOGGER.warn("cancel for action " + actionName + " failed with ", e);
+          LOG.warn("cancel for action " + actionName + " failed with ", e);
         }
       }
       futureHandles.remove(actionName);
     }
     // schedule a new task
-    sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS);
-    LOGGER.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
+    sf = scheduledExecutorService.schedule(() -> {
+        try {
+          runnable.run();
+          LOG.debug(actionName + " completed successfully.");
+        } catch (Throwable t) {
+          LOG.error(actionName + " threw an exception.", t);
+          if (scheduledTaskFailureCallback != null) {
+            scheduledTaskFailureCallback.onError(t);
+          }
+        }
+      },
+     debounceTimeMs,
+     TimeUnit.MILLISECONDS);
+    LOG.info("scheduled " + actionName + " in " + debounceTimeMs);
     futureHandles.put(actionName, sf);
   }
 
@@ -81,4 +105,8 @@ public class ScheduleAfterDebounceTime {
     // shutdown executor service
     scheduledExecutorService.shutdown();
   }
+
+  interface ScheduledTaskFailureCallback {
+    void onError(Throwable throwable);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 0ac9e8e..37eba2d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,91 +18,74 @@
  */
 package org.apache.samza.zk;
 
-import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.coordinator.BarrierForVersionUpgrade;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.util.*;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * JobCoordinator for stand alone processor managed via Zookeeper.
  */
 public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
-  private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
   private static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
+  // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
+  // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
+  private static final int METADATA_CACHE_TTL_MS = 5000;
 
   private final ZkUtils zkUtils;
   private final String processorId;
   private final ZkController zkController;
-  private final ScheduleAfterDebounceTime debounceTimer;
-  private final StreamMetadataCache  streamMetadataCache;
+
   private final Config config;
   private final CoordinationUtils coordinationUtils;
 
+  private StreamMetadataCache streamMetadataCache = null;
+  private ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
 
   public ZkJobCoordinator(Config config) {
-    this.debounceTimer = new ScheduleAfterDebounceTime();
     this.config = config;
     this.processorId = createProcessorId(config);
     this.coordinationUtils = new ZkCoordinationServiceFactory()
         .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
     this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
-    LeaderElector leaderElector = new ZkLeaderElector(this.processorId, zkUtils);
+    LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
-
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
-    streamMetadataCache = getStreamMetadataCache();
-  }
-
-  private StreamMetadataCache getStreamMetadataCache() {
-    // model generation - NEEDS TO BE REVIEWED
-    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
-      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
-      if (systemFactoryClassName == null) {
-        String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName);
-        log.error(msg);
-        throw new SamzaException(msg);
-      }
-      SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
-      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
-    }
-
-    return new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
   }
 
   @Override
   public void start() {
+    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+    debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+        LOG.error("Received exception from in JobCoordinator Processing!", throwable);
+        stop();
+      });
+
     zkController.register();
   }
 
   @Override
-  public void stop() {
+  public synchronized void stop() {
     if (coordinatorListener != null) {
       coordinatorListener.onJobModelExpired();
     }
@@ -131,7 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   //////////////////////////////////////////////// LEADER stuff ///////////////////////////
   @Override
   public void onProcessorChange(List<String> processors) {
-    log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
+    LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
     debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
         ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
   }
@@ -148,30 +131,29 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   public void onNewJobModelAvailable(final String version) {
     debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
       {
-        log.info("pid=" + processorId + "new JobModel available");
+        LOG.info("pid=" + processorId + "new JobModel available");
         // stop current work
         if (coordinatorListener != null) {
           coordinatorListener.onJobModelExpired();
         }
-        log.info("pid=" + processorId + "new JobModel available.Container stopped.");
+        LOG.info("pid=" + processorId + "new JobModel available.Container stopped.");
         // get the new job model
         newJobModel = zkUtils.getJobModel(version);
 
-        log.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
+        LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
 
         // update ZK and wait for all the processors to get this new version
-        ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(
-            JOB_MODEL_UPGRADE_BARRIER);
+        ZkBarrierForVersionUpgrade barrier =
+            (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_UPGRADE_BARRIER);
         barrier.waitForBarrier(version, processorId, () -> onNewJobModelConfirmed(version));
       });
   }
 
   @Override
   public void onNewJobModelConfirmed(String version) {
-    log.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
+    LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
     // get the new Model
     JobModel jobModel = getJobModel();
-    log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel);
 
     // start the container with the new model
     if (coordinatorListener != null) {
@@ -213,27 +195,26 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     String currentJMVersion  = zkUtils.getJobModelVersion();
     String nextJMVersion;
     if (currentJMVersion == null) {
-      log.info("pid=" + processorId + "generating first version of the model");
+      LOG.info("pid=" + processorId + "generating first version of the model");
       nextJMVersion = "1";
     } else {
       nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
     }
-    log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
+    LOG.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
 
     List<String> containerIds = new ArrayList<>(currentProcessorsIds.size());
     for (String processorPid : currentProcessorsIds) {
       containerIds.add(processorPid);
     }
-    log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
+    LOG.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
 
     JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
         containerIds);
 
-    log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
+    LOG.info("pid=" + processorId + "Generated jobModel: " + jobModel);
 
     // publish the new job model first
     zkUtils.publishJobModel(nextJMVersion, jobModel);
-    log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
 
     // start the barrier for the job model update
     BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(
@@ -242,13 +223,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     // publish new JobModel version
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
-    log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
+    LOG.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
   }
 
   class LeaderElectorListenerImpl implements LeaderElectorListener {
     @Override
     public void onBecomingLeader() {
-      log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+      LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
       zkController.subscribeToProcessorChange();
       debounceTimer.scheduleAfterDebounceTime(
         ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,

http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 0dd114c..a1b1e27 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -19,11 +19,27 @@
 
 package org.apache.samza.system
 
-import org.apache.samza.util.Logging
+import org.apache.samza.config.Config
+import org.apache.samza.util.{Util, Logging, Clock, SystemClock}
 import org.apache.samza.SamzaException
-import org.apache.samza.util.{Clock, SystemClock}
 import scala.collection.JavaConverters._
+import org.apache.samza.config.SystemConfig.Config2System
 
+object StreamMetadataCache {
+  def apply(cacheTtlMs: Int = 5000, config: Config): StreamMetadataCache = {
+    val systemNames = config.getSystemNames.toSet
+    // Map the name of each system to the corresponding SystemAdmin
+    val systemAdmins = systemNames.map(systemName => {
+      val systemFactoryClassName = config
+        .getSystemFactory(systemName)
+        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      systemName -> systemFactory.getAdmin(systemName, config)
+    }).toMap
+
+    new StreamMetadataCache(systemAdmins, cacheTtlMs, SystemClock.instance)
+  }
+}
 /**
  * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
  * 5 seconds), so that we can make many metadata requests in quick succession without

http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index cd396ad..d3152be 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -68,7 +68,6 @@ public class TestScheduleAfterDebounceTime {
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
     scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, testObj::inc);
-
     // next schedule should cancel the previous one with the same name
     scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () ->
       {
@@ -88,4 +87,26 @@ public class TestScheduleAfterDebounceTime {
 
     scheduledQueue.stopScheduler();
   }
+
+  @Test
+  public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
+    final CountDownLatch latch = new CountDownLatch(1);
+    ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(e -> {
+        Assert.assertEquals(RuntimeException.class, e.getClass());
+        latch.countDown();
+      });
+
+    scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () ->
+      {
+        throw new RuntimeException("From the runnable!");
+      });
+
+    final TestObj testObj = new TestObj();
+    scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME * 2, testObj::inc);
+
+    boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS);
+    Assert.assertTrue("Latch timed-out.", result);
+    Assert.assertEquals(0, testObj.get());
+    scheduledQueue.stopScheduler();
+  }
 }