You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/09 00:59:06 UTC
samza git commit: SAMZA-1150 : Handling Error propagation between
ZkJobCoordinator & DebounceTimer
Repository: samza
Updated Branches:
refs/heads/master 28afae09d -> 2d47ee804
SAMZA-1150 : Handling Error propagation between ZkJobCoordinator & DebounceTimer
This PR depends on PR #153
* Treats all errors in jobcoordinator as FATAL and shuts-down the streamprocessor
* [Bug] Fixed bug reported in SAMZA-1241
* Introduced a callback to be associated with the timer (same callback for every Runnable failure)
**TBD**: some more unit tests
Author: Navina Ramesh <na...@apache.org>
Reviewers: Xinyu Liu <xi...@apache.org>, Prateek Maheshwari <pm...@linkedin.com>
Closes #166 from navina/SAMZA-1150
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2d47ee80
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2d47ee80
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2d47ee80
Branch: refs/heads/master
Commit: 2d47ee8048d71466b1da695832d009f9dfea0b15
Parents: 28afae0
Author: Navina Ramesh <na...@apache.org>
Authored: Mon May 8 17:58:55 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Mon May 8 17:58:55 2017 -0700
----------------------------------------------------------------------
.../samza/zk/ScheduleAfterDebounceTime.java | 40 ++++++++--
.../org/apache/samza/zk/ZkJobCoordinator.java | 79 ++++++++------------
.../samza/system/StreamMetadataCache.scala | 20 ++++-
.../samza/zk/TestScheduleAfterDebounceTime.java | 23 +++++-
4 files changed, 104 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
index 21572f5..5cfd37a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ScheduleAfterDebounceTime.java
@@ -31,7 +31,6 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* This class allows scheduling a Runnable actions after some debounce time.
* When the same action is scheduled it needs to cancel the previous one. To accomplish that we keep the previous
@@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory;
* ZK based standalone app.
*/
public class ScheduleAfterDebounceTime {
- public static final Logger LOGGER = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
+ public static final Logger LOG = LoggerFactory.getLogger(ScheduleAfterDebounceTime.class);
public static final long TIMEOUT_MS = 1000 * 10; // timeout to wait for a task to complete
// Here we predefine some actions which are used in the ZK based standalone app.
@@ -51,29 +50,54 @@ public class ScheduleAfterDebounceTime {
public static final int DEBOUNCE_TIME_MS = 2000;
+ private final ScheduledTaskFailureCallback scheduledTaskFailureCallback;
+
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("debounce-thread-%d").setDaemon(true).build());
private final Map<String, ScheduledFuture> futureHandles = new HashMap<>();
+ // Ideally, this should be only used for testing. But ZkBarrierForVersionUpgrades uses it. This needs to be fixed.
+ // TODO: Timer shouldn't be passed around the components. It should be associated with the JC or the caller of
+ // coordinationUtils.
+ public ScheduleAfterDebounceTime() {
+ this.scheduledTaskFailureCallback = null;
+ }
+
+ public ScheduleAfterDebounceTime(ScheduledTaskFailureCallback errorScheduledTaskFailureCallback) {
+ this.scheduledTaskFailureCallback = errorScheduledTaskFailureCallback;
+ }
+
synchronized public void scheduleAfterDebounceTime(String actionName, long debounceTimeMs, Runnable runnable) {
// check if this action has been scheduled already
ScheduledFuture sf = futureHandles.get(actionName);
if (sf != null && !sf.isDone()) {
- LOGGER.info("cancel future for " + actionName);
+ LOG.info("cancel future for " + actionName);
// attempt to cancel
if (!sf.cancel(false)) {
try {
sf.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
// we ignore the exception
- LOGGER.warn("cancel for action " + actionName + " failed with ", e);
+ LOG.warn("cancel for action " + actionName + " failed with ", e);
}
}
futureHandles.remove(actionName);
}
// schedule a new task
- sf = scheduledExecutorService.schedule(runnable, debounceTimeMs, TimeUnit.MILLISECONDS);
- LOGGER.info("DEBOUNCE: scheduled " + actionName + " in " + debounceTimeMs);
+ sf = scheduledExecutorService.schedule(() -> {
+ try {
+ runnable.run();
+ LOG.debug(actionName + " completed successfully.");
+ } catch (Throwable t) {
+ LOG.error(actionName + " threw an exception.", t);
+ if (scheduledTaskFailureCallback != null) {
+ scheduledTaskFailureCallback.onError(t);
+ }
+ }
+ },
+ debounceTimeMs,
+ TimeUnit.MILLISECONDS);
+ LOG.info("scheduled " + actionName + " in " + debounceTimeMs);
futureHandles.put(actionName, sf);
}
@@ -81,4 +105,8 @@ public class ScheduleAfterDebounceTime {
// shutdown executor service
scheduledExecutorService.shutdown();
}
+
+ interface ScheduledTaskFailureCallback {
+ void onError(Throwable throwable);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 0ac9e8e..37eba2d 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -18,91 +18,74 @@
*/
package org.apache.samza.zk;
-import org.apache.samza.SamzaException;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JavaSystemConfig;
import org.apache.samza.coordinator.BarrierForVersionUpgrade;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElector;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.job.model.JobModel;
-import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
-import org.apache.samza.util.*;
+import org.apache.samza.util.ClassLoaderHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
/**
* JobCoordinator for stand alone processor managed via Zookeeper.
*/
public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
- private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinator.class);
private static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
+ // TODO: MetadataCache timeout has to be 0 for the leader so that it can always have the latest information associated
+ // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
+ private static final int METADATA_CACHE_TTL_MS = 5000;
private final ZkUtils zkUtils;
private final String processorId;
private final ZkController zkController;
- private final ScheduleAfterDebounceTime debounceTimer;
- private final StreamMetadataCache streamMetadataCache;
+
private final Config config;
private final CoordinationUtils coordinationUtils;
+ private StreamMetadataCache streamMetadataCache = null;
+ private ScheduleAfterDebounceTime debounceTimer = null;
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
public ZkJobCoordinator(Config config) {
- this.debounceTimer = new ScheduleAfterDebounceTime();
this.config = config;
this.processorId = createProcessorId(config);
this.coordinationUtils = new ZkCoordinationServiceFactory()
.getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
- LeaderElector leaderElector = new ZkLeaderElector(this.processorId, zkUtils);
+ LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
-
this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
- streamMetadataCache = getStreamMetadataCache();
- }
-
- private StreamMetadataCache getStreamMetadataCache() {
- // model generation - NEEDS TO BE REVIEWED
- JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
- Map<String, SystemAdmin> systemAdmins = new HashMap<>();
- for (String systemName: systemConfig.getSystemNames()) {
- String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
- if (systemFactoryClassName == null) {
- String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName);
- log.error(msg);
- throw new SamzaException(msg);
- }
- SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
- systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
- }
-
- return new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
}
@Override
public void start() {
+ streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+ debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+ LOG.error("Received exception from in JobCoordinator Processing!", throwable);
+ stop();
+ });
+
zkController.register();
}
@Override
- public void stop() {
+ public synchronized void stop() {
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
@@ -131,7 +114,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
//////////////////////////////////////////////// LEADER stuff ///////////////////////////
@Override
public void onProcessorChange(List<String> processors) {
- log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
+ LOG.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> doOnProcessorChange(processors));
}
@@ -148,30 +131,29 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
public void onNewJobModelAvailable(final String version) {
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
{
- log.info("pid=" + processorId + "new JobModel available");
+ LOG.info("pid=" + processorId + "new JobModel available");
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
- log.info("pid=" + processorId + "new JobModel available.Container stopped.");
+ LOG.info("pid=" + processorId + "new JobModel available.Container stopped.");
// get the new job model
newJobModel = zkUtils.getJobModel(version);
- log.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
+ LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
// update ZK and wait for all the processors to get this new version
- ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(
- JOB_MODEL_UPGRADE_BARRIER);
+ ZkBarrierForVersionUpgrade barrier =
+ (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_UPGRADE_BARRIER);
barrier.waitForBarrier(version, processorId, () -> onNewJobModelConfirmed(version));
});
}
@Override
public void onNewJobModelConfirmed(String version) {
- log.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
+ LOG.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
// get the new Model
JobModel jobModel = getJobModel();
- log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel);
// start the container with the new model
if (coordinatorListener != null) {
@@ -213,27 +195,26 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
String currentJMVersion = zkUtils.getJobModelVersion();
String nextJMVersion;
if (currentJMVersion == null) {
- log.info("pid=" + processorId + "generating first version of the model");
+ LOG.info("pid=" + processorId + "generating first version of the model");
nextJMVersion = "1";
} else {
nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
}
- log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
+ LOG.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
List<String> containerIds = new ArrayList<>(currentProcessorsIds.size());
for (String processorPid : currentProcessorsIds) {
containerIds.add(processorPid);
}
- log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
+ LOG.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
containerIds);
- log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
+ LOG.info("pid=" + processorId + "Generated jobModel: " + jobModel);
// publish the new job model first
zkUtils.publishJobModel(nextJMVersion, jobModel);
- log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
// start the barrier for the job model update
BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(
@@ -242,13 +223,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// publish new JobModel version
zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
- log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
+ LOG.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
}
class LeaderElectorListenerImpl implements LeaderElectorListener {
@Override
public void onBecomingLeader() {
- log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+ LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
zkController.subscribeToProcessorChange();
debounceTimer.scheduleAfterDebounceTime(
ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
index 0dd114c..a1b1e27 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
@@ -19,11 +19,27 @@
package org.apache.samza.system
-import org.apache.samza.util.Logging
+import org.apache.samza.config.Config
+import org.apache.samza.util.{Util, Logging, Clock, SystemClock}
import org.apache.samza.SamzaException
-import org.apache.samza.util.{Clock, SystemClock}
import scala.collection.JavaConverters._
+import org.apache.samza.config.SystemConfig.Config2System
+object StreamMetadataCache {
+ def apply(cacheTtlMs: Int = 5000, config: Config): StreamMetadataCache = {
+ val systemNames = config.getSystemNames.toSet
+ // Map the name of each system to the corresponding SystemAdmin
+ val systemAdmins = systemNames.map(systemName => {
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ systemName -> systemFactory.getAdmin(systemName, config)
+ }).toMap
+
+ new StreamMetadataCache(systemAdmins, cacheTtlMs, SystemClock.instance)
+ }
+}
/**
* Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
* 5 seconds), so that we can make many metadata requests in quick succession without
http://git-wip-us.apache.org/repos/asf/samza/blob/2d47ee80/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index cd396ad..d3152be 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -68,7 +68,6 @@ public class TestScheduleAfterDebounceTime {
final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, testObj::inc);
-
// next schedule should cancel the previous one with the same name
scheduledQueue.scheduleAfterDebounceTime("TEST1", 2 * WAIT_TIME, () ->
{
@@ -88,4 +87,26 @@ public class TestScheduleAfterDebounceTime {
scheduledQueue.stopScheduler();
}
+
+ @Test
+ public void testRunnableWithExceptionInvokesCallback() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ ScheduleAfterDebounceTime scheduledQueue = new ScheduleAfterDebounceTime(e -> {
+ Assert.assertEquals(RuntimeException.class, e.getClass());
+ latch.countDown();
+ });
+
+ scheduledQueue.scheduleAfterDebounceTime("TEST1", WAIT_TIME, () ->
+ {
+ throw new RuntimeException("From the runnable!");
+ });
+
+ final TestObj testObj = new TestObj();
+ scheduledQueue.scheduleAfterDebounceTime("TEST2", WAIT_TIME * 2, testObj::inc);
+
+ boolean result = latch.await(5 * WAIT_TIME, TimeUnit.MILLISECONDS);
+ Assert.assertTrue("Latch timed-out.", result);
+ Assert.assertEquals(0, testObj.get());
+ scheduledQueue.stopScheduler();
+ }
}