You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/07/10 23:35:02 UTC
samza git commit: SAMZA-1730: Adding state valiations in
StreamProcessor before any lifecycle operation and group coordination.
Repository: samza
Updated Branches:
refs/heads/master 08cfad990 -> 93b397e84
SAMZA-1730: Adding state valiations in StreamProcessor before any lifecycle operation and group coordination.
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #535 from shanthoosh/abced
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/93b397e8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/93b397e8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/93b397e8
Branch: refs/heads/master
Commit: 93b397e84056f311d238be72181b2df60cccb6c0
Parents: 08cfad9
Author: Shanthoosh Venkataraman <sa...@gmail.com>
Authored: Tue Jul 10 16:34:58 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Jul 10 16:34:58 2018 -0700
----------------------------------------------------------------------
.../samza/container/SamzaContainerListener.java | 9 +-
.../apache/samza/processor/StreamProcessor.java | 281 ++++++++++++-------
.../samza/runtime/LocalContainerRunner.java | 2 +-
.../standalone/PassthroughJobCoordinator.java | 1 +
.../apache/samza/container/SamzaContainer.scala | 16 +-
.../samza/job/local/ThreadJobFactory.scala | 2 +-
.../samza/processor/TestStreamProcessor.java | 162 ++++++++++-
.../samza/container/TestSamzaContainer.scala | 10 +-
.../samza/processor/TestZkStreamProcessor.java | 4 +-
.../TestZkStreamProcessorFailures.java | 4 +-
.../processor/TestZkStreamProcessorSession.java | 4 +-
.../processor/TestZkLocalApplicationRunner.java | 145 +++++++---
12 files changed, 456 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
index a9c3b2c..fe8bc66 100644
--- a/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
+++ b/samza-core/src/main/java/org/apache/samza/container/SamzaContainerListener.java
@@ -37,19 +37,16 @@ public interface SamzaContainerListener {
* <br>
* <b>Note</b>: This will be the last call after completely shutting down the SamzaContainer without any
* exceptions/errors.
- * @param pausedByJm boolean indicating why the container was stopped. It should be {@literal true}, iff the container
- * was stopped as a result of an expired {@link org.apache.samza.job.model.JobModel}. Otherwise,
- * it should be {@literal false}
*/
- void onContainerStop(boolean pausedByJm);
+ void onContainerStop();
/**
* Method invoked when the {@link org.apache.samza.container.SamzaContainer} has transitioned to
* {@link org.apache.samza.SamzaContainerStatus#FAILED} state. Details on state transitions can be found in
* {@link org.apache.samza.SamzaContainerStatus}
* <br>
- * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop(boolean)}.
- * @param t Throwable that caused the container failure.
+ * <b>Note</b>: {@link #onContainerFailed(Throwable)} is mutually exclusive to {@link #onContainerStop()}.
+ * @param t Throwable that caused the container failure.
*/
void onContainerFailed(Throwable t);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 73f32e7..22550d5 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -19,6 +19,8 @@
package org.apache.samza.processor;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -26,7 +28,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import org.apache.samza.SamzaContainerStatus;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
@@ -49,10 +50,44 @@ import org.slf4j.LoggerFactory;
/**
* StreamProcessor can be embedded in any application or executed in a distributed environment (aka cluster) as an
* independent process.
+ *
* <p>
*
* <b>Note</b>: A single JVM can create multiple StreamProcessor instances. It is safe to create StreamProcessor instances in
- * multiple threads.
+ * multiple threads. This class is thread safe.
+ *
+ * </p>
+ *
+ * <pre>
+ * A StreamProcessor could be in any one of the following states:
+ * NEW, STARTED, IN_REBALANCE, RUNNING, STOPPING, STOPPED.
+ *
+ * Describes the valid state transitions of the {@link StreamProcessor}.
+ *
+ *
+ * ────────────────────────────────
+ * │ │
+ * │ │
+ * │ │
+ * │ │
+ * New StreamProcessor.start() Rebalance triggered V Receives JobModel │
+ * StreamProcessor ──────────▶ NEW ───────────────────────────▶ STARTED ──────────────────▶ IN_REBALANCE ─────────────────────▶ RUNNING
+ * Creation │ │ by group leader │ and starts Container │
+ * │ │ │ │
+ * Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop() Stre│amProcessor.stop()
+ * │ │ │ │
+ * │ │ │ │
+ * │ │ │ │
+ * V V V V
+ * ───────────────────────────▶ STOPPING D──────────────────────────────────────────────────────────
+ * │
+ * │
+ * After JobCoordinator and SamzaContainer had shutdown.
+ * │
+ * V
+ * STOPPED
+ *
+ * </pre>
*/
@InterfaceStability.Evolving
public class StreamProcessor {
@@ -69,31 +104,59 @@ public class StreamProcessor {
private final ExecutorService executorService;
private final Object lock = new Object();
- private SamzaContainer container = null;
private Throwable containerException = null;
private boolean processorOnStartCalled = false;
- // Latch used to synchronize between the JobCoordinator thread and the container thread, when the container is
- // stopped due to re-balancing
- volatile CountDownLatch jcContainerShutdownLatch;
+ volatile CountDownLatch containerShutdownLatch = new CountDownLatch(1);
+
+ /**
+ * Indicates the current status of a {@link StreamProcessor}.
+ */
+ public enum State {
+ STARTED("STARTED"), RUNNING("RUNNING"), STOPPING("STOPPING"), STOPPED("STOPPED"), NEW("NEW"), IN_REBALANCE("IN_REBALANCE");
+
+ private String strVal;
+
+ State(String strVal) {
+ this.strVal = strVal;
+ }
+
+ @Override
+ public String toString() {
+ return strVal;
+ }
+ }
+
+ /**
+ * @return the current state of StreamProcessor.
+ */
+ public State getState() {
+ return state;
+ }
+
+ @VisibleForTesting
+ State state = State.NEW;
+
+ @VisibleForTesting
+ SamzaContainer container = null;
@VisibleForTesting
JobCoordinatorListener jobCoordinatorListener = null;
/**
- * Create an instance of StreamProcessor that encapsulates a JobCoordinator and Samza Container
+ * StreamProcessor encapsulates and manages the lifecycle of {@link JobCoordinator} and {@link SamzaContainer}.
+ *
* <p>
- * JobCoordinator controls how the various StreamProcessor instances belonging to a job coordinate. It is also
- * responsible generating and updating JobModel.
- * When StreamProcessor starts, it starts the JobCoordinator and brings up a SamzaContainer based on the JobModel.
- * SamzaContainer is executed using an ExecutorService.
+ * On startup, StreamProcessor starts the JobCoordinator. Schedules the SamzaContainer to run in a ExecutorService
+ * when it receives new {@link JobModel} from JobCoordinator.
* <p>
- * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor, and NOT exposed to the user
*
- * @param config Instance of config object - contains all configuration required for processing
- * @param customMetricsReporters Map of custom MetricReporter instances that are to be injected in the Samza job
+ * <b>Note:</b> Lifecycle of the ExecutorService is fully managed by the StreamProcessor.
+ *
+ * @param config configuration required to launch {@link JobCoordinator} and {@link SamzaContainer}.
+ * @param customMetricsReporters metricReporter instances that will be used by SamzaContainer and JobCoordinator to report metrics.
* @param asyncStreamTaskFactory The {@link AsyncStreamTaskFactory} to be used for creating task instances.
- * @param processorListener listener to the StreamProcessor life cycle
+ * @param processorListener listener to the StreamProcessor life cycle.
*/
public StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters,
AsyncStreamTaskFactory asyncStreamTaskFactory, StreamProcessorLifecycleListener processorListener) {
@@ -101,7 +164,7 @@ public class StreamProcessor {
}
/**
- *Same as {@link #StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
+ * Same as {@link StreamProcessor(Config, Map, AsyncStreamTaskFactory, StreamProcessorLifecycleListener)}, except task
* instances are created using the provided {@link StreamTaskFactory}.
* @param config - config
* @param customMetricsReporters metric Reporter
@@ -114,7 +177,7 @@ public class StreamProcessor {
}
/* package private */
- JobCoordinator getJobCoordinator() {
+ private JobCoordinator getJobCoordinator() {
String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config);
}
@@ -126,6 +189,7 @@ public class StreamProcessor {
StreamProcessor(Config config, Map<String, MetricsReporter> customMetricsReporters, Object taskFactory,
StreamProcessorLifecycleListener processorListener, JobCoordinator jobCoordinator) {
+ Preconditions.checkNotNull(processorListener, "ProcessorListener cannot be null.");
this.taskFactory = taskFactory;
this.config = config;
this.taskShutdownMs = new TaskConfigJava(config).getShutdownMs();
@@ -148,7 +212,14 @@ public class StreamProcessor {
* </p>
*/
public void start() {
- jobCoordinator.start();
+ synchronized (lock) {
+ if (state == State.NEW) {
+ state = State.STARTED;
+ jobCoordinator.start();
+ } else {
+ LOGGER.info("Start is no-op, since the current state is {} and not {}.", state, State.NEW);
+ }
+ }
}
/**
@@ -156,7 +227,7 @@ public class StreamProcessor {
* Asynchronously stops the {@link StreamProcessor}'s running components - {@link SamzaContainer}
* and {@link JobCoordinator}
* </p>
- * There are multiple ways in which the StreamProcessor stops:
+ * Here're the ways which can stop the StreamProcessor:
* <ol>
* <li>Caller of StreamProcessor invokes stop()</li>
* <li>Samza Container completes processing (eg. bounded input) and shuts down</li>
@@ -168,7 +239,7 @@ public class StreamProcessor {
* <br>
* If container is running,
* <ol>
- * <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop(boolean)} will trigger
+ * <li>container is shutdown cleanly and {@link SamzaContainerListener#onContainerStop()} will trigger
* {@link JobCoordinator#stop()}</li>
* <li>container fails to shutdown cleanly and {@link SamzaContainerListener#onContainerFailed(Throwable)} will
* trigger {@link JobCoordinator#stop()}</li>
@@ -178,20 +249,22 @@ public class StreamProcessor {
*/
public void stop() {
synchronized (lock) {
- boolean containerShutdownInvoked = false;
- if (container != null) {
+ if (state != State.STOPPING && state != State.STOPPED) {
+ state = State.STOPPING;
try {
LOGGER.info("Shutting down the container: {} of stream processor: {}.", container, processorId);
- container.shutdown();
- containerShutdownInvoked = true;
- } catch (Exception exception) {
- LOGGER.error(String.format("Ignoring the exception during the shutdown of container: %s.", container), exception);
+ boolean hasContainerShutdown = stopSamzaContainer();
+ if (!hasContainerShutdown) {
+ LOGGER.info("Interrupting the container: {} thread to die.", container);
+ executorService.shutdownNow();
+ }
+ } catch (Throwable throwable) {
+ LOGGER.error(String.format("Exception occurred on container: %s shutdown of stream processor: %s.", container, processorId), throwable);
}
- }
-
- if (!containerShutdownInvoked) {
- LOGGER.info("Shutting down JobCoordinator from StreamProcessor");
+ LOGGER.info("Shutting down JobCoordinator of stream processor: {}.", processorId);
jobCoordinator.stop();
+ } else {
+ LOGGER.info("StreamProcessor state is: {}. Ignoring the stop.", state);
}
}
}
@@ -200,44 +273,51 @@ public class StreamProcessor {
return SamzaContainer.apply(processorId, jobModel, config, ScalaJavaUtil.toScalaMap(customMetricsReporter), taskFactory);
}
- JobCoordinatorListener createJobCoordinatorListener() {
+ /**
+ * Stops the {@link SamzaContainer}.
+ * @return true if {@link SamzaContainer} had shutdown within task.shutdown.ms. false otherwise.
+ */
+ private boolean stopSamzaContainer() {
+ boolean hasContainerShutdown = true;
+ if (container != null) {
+ if (!container.hasStopped()) {
+ try {
+ container.shutdown();
+ LOGGER.info("Waiting {} ms for the container: {} to shutdown.", taskShutdownMs, container);
+ hasContainerShutdown = containerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
+ } catch (IllegalContainerStateException icse) {
+ LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
+ } catch (Exception e) {
+ LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
+ hasContainerShutdown = false;
+ }
+ LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %b.", container, processorId, hasContainerShutdown));
+ } else {
+ LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
+ }
+ }
+ return hasContainerShutdown;
+ }
+
+ private JobCoordinatorListener createJobCoordinatorListener() {
return new JobCoordinatorListener() {
@Override
public void onJobModelExpired() {
synchronized (lock) {
- if (container != null) {
- SamzaContainerStatus status = container.getStatus();
- if (SamzaContainerStatus.NOT_STARTED.equals(status) || SamzaContainerStatus.STARTED.equals(status)) {
- boolean shutdownComplete = false;
- try {
- LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container,
- processorId);
- container.pause();
- shutdownComplete = jcContainerShutdownLatch.await(taskShutdownMs, TimeUnit.MILLISECONDS);
- LOGGER.info(String.format("Shutdown status of container: %s for stream processor: %s is: %s.", container, processorId, shutdownComplete));
- } catch (IllegalContainerStateException icse) {
- // Ignored since container is not running
- LOGGER.info(String.format("Cannot shutdown container: %s for stream processor: %s. Container is not running.", container, processorId), icse);
- shutdownComplete = true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOGGER.warn(String.format("Shutdown of container: %s for stream processor: %s was interrupted", container, processorId), e);
- } catch (Exception e) {
- LOGGER.error("Exception occurred when shutting down the container: {}.", container, e);
- }
- if (!shutdownComplete) {
- LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
- container = null;
- stop();
- } else {
- LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
- }
+ if (state == State.STARTED || state == State.RUNNING) {
+ state = State.IN_REBALANCE;
+ LOGGER.info("Job model expired. Shutting down the container: {} of stream processor: {}.", container, processorId);
+ boolean hasContainerShutdown = stopSamzaContainer();
+ if (!hasContainerShutdown) {
+ LOGGER.warn("Container: {} shutdown was unsuccessful. Stopping the stream processor: {}.", container, processorId);
+ state = State.STOPPING;
+ jobCoordinator.stop();
} else {
- LOGGER.info("Container: {} of the stream processor: {} is not running.", container, processorId);
+ LOGGER.info("Container: {} shutdown completed for stream processor: {}.", container, processorId);
}
} else {
- LOGGER.info("Container is not instantiated for stream processor: {}.", processorId);
+ LOGGER.info("Ignoring onJobModelExpired invocation since the current state is {} and not in {}.", state, ImmutableList.of(State.RUNNING, State.STARTED));
}
}
}
@@ -245,35 +325,42 @@ public class StreamProcessor {
@Override
public void onNewJobModel(String processorId, JobModel jobModel) {
synchronized (lock) {
- jcContainerShutdownLatch = new CountDownLatch(1);
- container = createSamzaContainer(processorId, jobModel);
- container.setContainerListener(new ContainerListener());
- LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
- executorService.submit(container::run);
+ if (state == State.IN_REBALANCE) {
+ containerShutdownLatch = new CountDownLatch(1);
+ container = createSamzaContainer(processorId, jobModel);
+ container.setContainerListener(new ContainerListener());
+ LOGGER.info("Starting the container: {} for the stream processor: {}.", container, processorId);
+ executorService.submit(container);
+ } else {
+ LOGGER.info("Ignoring onNewJobModel invocation since the current state is {} and not {}.", state, State.IN_REBALANCE);
+ }
}
}
@Override
public void onCoordinatorStop() {
- if (executorService != null) {
+ synchronized (lock) {
LOGGER.info("Shutting down the executor service of the stream processor: {}.", processorId);
+ stopSamzaContainer();
executorService.shutdownNow();
+ state = State.STOPPED;
}
- if (processorListener != null) {
- if (containerException != null)
- processorListener.onFailure(containerException);
- else
- processorListener.onShutdown();
- }
+ if (containerException != null)
+ processorListener.onFailure(containerException);
+ else
+ processorListener.onShutdown();
+
}
@Override
public void onCoordinatorFailure(Throwable throwable) {
- LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
- stop();
- if (processorListener != null) {
- processorListener.onFailure(throwable);
+ synchronized (lock) {
+ LOGGER.info(String.format("Coordinator: %s failed with an exception. Stopping the stream processor: %s. Original exception:", jobCoordinator, processorId), throwable);
+ stopSamzaContainer();
+ executorService.shutdownNow();
+ state = State.STOPPED;
}
+ processorListener.onFailure(throwable);
}
};
}
@@ -287,46 +374,36 @@ public class StreamProcessor {
@Override
public void onContainerStart() {
+ LOGGER.warn("Received container start notification for container: {} in stream processor: {}.", container, processorId);
if (!processorOnStartCalled) {
- // processorListener is called on start only the first time the container starts.
- // It is not called after every re-balance of partitions among the processors
+ processorListener.onStart();
processorOnStartCalled = true;
- if (processorListener != null) {
- processorListener.onStart();
- }
- } else {
- LOGGER.warn("Received duplicate container start notification for container: {} in stream processor: {}.", container, processorId);
}
+ state = State.RUNNING;
}
@Override
- public void onContainerStop(boolean pauseByJm) {
- if (pauseByJm) {
- LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
- if (jcContainerShutdownLatch != null) {
- jcContainerShutdownLatch.countDown();
- }
- } else { // sp.stop was called or container stopped by itself
- LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
- synchronized (lock) {
- container = null; // this guarantees that stop() doesn't try to stop container again
- stop();
+ public void onContainerStop() {
+ containerShutdownLatch.countDown();
+ synchronized (lock) {
+ if (state == State.IN_REBALANCE) {
+ LOGGER.info("Container: {} of the stream processor: {} was stopped by the JobCoordinator.", container, processorId);
+ } else {
+ LOGGER.info("Container: {} stopped. Stopping the stream processor: {}.", container, processorId);
+ state = State.STOPPING;
+ jobCoordinator.stop();
}
}
}
@Override
public void onContainerFailed(Throwable t) {
- if (jcContainerShutdownLatch != null) {
- jcContainerShutdownLatch.countDown();
- } else {
- LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
- }
+ containerShutdownLatch.countDown();
synchronized (lock) {
- containerException = t;
LOGGER.error(String.format("Container: %s failed with an exception. Stopping the stream processor: %s. Original exception:", container, processorId), containerException);
- container = null;
- stop();
+ state = State.STOPPING;
+ containerException = t;
+ jobCoordinator.stop();
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 66176d7..e6e622d 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -87,7 +87,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
}
@Override
- public void onContainerStop(boolean invokedExternally) {
+ public void onContainerStop() {
log.info("Container Stopped");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
index 01ee84e..228617a 100644
--- a/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
@@ -89,6 +89,7 @@ public class PassthroughJobCoordinator implements JobCoordinator {
}
if (jobModel != null && jobModel.getContainers().containsKey(processorId)) {
if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
coordinatorListener.onNewJobModel(processorId, jobModel);
}
} else {
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index be0fb26..89278ad 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -736,7 +736,6 @@ class SamzaContainer(
@volatile private var status = SamzaContainerStatus.NOT_STARTED
private var exceptionSeen: Throwable = null
- private var paused: Boolean = false
private var containerListener: SamzaContainerListener = null
def getStatus(): SamzaContainerStatus = status
@@ -747,6 +746,8 @@ class SamzaContainer(
containerListener = listener
}
+ def hasStopped(): Boolean = status == SamzaContainerStatus.STOPPED || status == SamzaContainerStatus.FAILED
+
def run {
try {
info("Starting container.")
@@ -824,7 +825,7 @@ class SamzaContainer(
status match {
case SamzaContainerStatus.STOPPED =>
if (containerListener != null) {
- containerListener.onContainerStop(paused)
+ containerListener.onContainerStop()
}
case SamzaContainerStatus.FAILED =>
if (containerListener != null) {
@@ -833,17 +834,6 @@ class SamzaContainer(
}
}
- // TODO: We want to introduce a "PAUSED" state for SamzaContainer in the future so that StreamProcessor can pause and
- // unpause the container when the jobmodel changes.
- /**
- * Marks the [[SamzaContainer]] as being paused by the called due to a change in [[JobModel]] and then, asynchronously
- * shuts down this [[SamzaContainer]]
- */
- def pause(): Unit = {
- paused = true
- shutdown()
- }
-
/**
* <p>
* Asynchronously shuts down this [[SamzaContainer]]
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 029b375..7b83874 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -94,7 +94,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
throw t
}
- override def onContainerStop(pausedOrNot: Boolean): Unit = {
+ override def onContainerStop(): Unit = {
}
override def onContainerStart(): Unit = {
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index fc1259c..052aa29 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.processor;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.SamzaContainerStatus;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
@@ -28,13 +29,13 @@ import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.processor.StreamProcessor.State;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -42,7 +43,8 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -57,6 +59,7 @@ public class TestStreamProcessor {
@Before
public void before() {
+ Mockito.reset();
processorListenerState = new ConcurrentHashMap<ListenerCallback, Boolean>() {
{
put(ListenerCallback.ON_START, false);
@@ -103,12 +106,12 @@ public class TestStreamProcessor {
return null;
}).when(mockRunLoop).run();
- doAnswer(invocation ->
+ Mockito.doAnswer(invocation ->
{
containerStop.countDown();
return null;
}).when(mockRunLoop).shutdown();
- container = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, mock(StreamTask.class));
+ container = StreamProcessorTestUtils.getDummyContainer(mockRunLoop, Mockito.mock(StreamTask.class));
}
return container;
}
@@ -166,6 +169,7 @@ public class TestStreamProcessor {
final Thread jcThread = new Thread(() ->
{
try {
+ processor.jobCoordinatorListener.onJobModelExpired();
processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel());
coordinatorStop.await();
processor.jobCoordinatorListener.onCoordinatorStop();
@@ -215,7 +219,7 @@ public class TestStreamProcessor {
*/
@Test
public void testContainerFailureCorrectlyStopsProcessor() throws InterruptedException {
- JobCoordinator mockJobCoordinator = mock(JobCoordinator.class);
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
Throwable expectedThrowable = new SamzaException("Failure in Container!");
AtomicReference<Throwable> actualThrowable = new AtomicReference<>();
final CountDownLatch runLoopStartedLatch = new CountDownLatch(1);
@@ -271,6 +275,7 @@ public class TestStreamProcessor {
new Thread(() ->
{
try {
+ processor.jobCoordinatorListener.onJobModelExpired();
processor.jobCoordinatorListener.onNewJobModel("1", getMockJobModel());
coordinatorStop.await();
processor.jobCoordinatorListener.onCoordinatorStop();
@@ -296,9 +301,146 @@ public class TestStreamProcessor {
Assert.assertTrue(processorListenerState.get(ListenerCallback.ON_FAILURE));
}
- // TODO:
- // Test multiple start / stop and its ordering
- // test onNewJobModel
- // test onJobModelExpiry
- // test Coordinator failure - correctly shutsdown the streamprocessor
+ @Test
+ public void testStartOperationShouldBeIdempotent() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ Mockito.doNothing().when(mockJobCoordinator).start();
+ StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+ StreamProcessor streamProcessor = new StreamProcessor(new MapConfig(), new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+ Assert.assertEquals(State.NEW, streamProcessor.getState());
+ streamProcessor.start();
+
+ Assert.assertEquals(State.STARTED, streamProcessor.getState());
+
+ streamProcessor.start();
+
+ Assert.assertEquals(State.STARTED, streamProcessor.getState());
+
+ Mockito.verify(mockJobCoordinator, Mockito.times(1)).start();
+ }
+
+ @Test
+ public void testOnJobModelExpiredShouldMakeCorrectStateTransitions() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+ SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+
+ /**
+ * Without a SamzaContainer running in StreamProcessor and current StreamProcessor state is STARTED,
+ * onJobModelExpired should move the state to IN_REBALANCE.
+ */
+
+ streamProcessor.start();
+
+ Assert.assertEquals(State.STARTED, streamProcessor.getState());
+
+ streamProcessor.jobCoordinatorListener.onJobModelExpired();
+
+ Assert.assertEquals(State.IN_REBALANCE, streamProcessor.getState());
+
+ /**
+ * When there's initialized SamzaContainer in StreamProcessor and the container shutdown
+ * fails in onJobModelExpired. onJobModelExpired should move StreamProcessor to STOPPING
+ * state and should shutdown JobCoordinator.
+ */
+ Mockito.doNothing().when(mockJobCoordinator).start();
+ Mockito.doNothing().when(mockJobCoordinator).stop();
+ Mockito.doNothing().when(mockSamzaContainer).shutdown();
+ Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
+ Mockito.when(mockSamzaContainer.getStatus())
+ .thenReturn(SamzaContainerStatus.STARTED)
+ .thenReturn(SamzaContainerStatus.STOPPED);
+ streamProcessor.container = mockSamzaContainer;
+ streamProcessor.state = State.STARTED;
+
+ streamProcessor.jobCoordinatorListener.onJobModelExpired();
+
+ Assert.assertEquals(State.STOPPING, streamProcessor.getState());
+ Mockito.verify(mockSamzaContainer, Mockito.times(1)).shutdown();
+ Mockito.verify(mockJobCoordinator, Mockito.times(1)).stop();
+
+ // If StreamProcessor is in IN_REBALANCE state, onJobModelExpired should be a NO_OP.
+ streamProcessor.state = State.IN_REBALANCE;
+
+ streamProcessor.jobCoordinatorListener.onJobModelExpired();
+
+ Assert.assertEquals(State.IN_REBALANCE, streamProcessor.state);
+ }
+
+ @Test
+ public void testOnNewJobModelShouldResultInValidStateTransitions() throws Exception {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+ SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+
+ streamProcessor.container = mockSamzaContainer;
+ streamProcessor.state = State.IN_REBALANCE;
+ Mockito.doNothing().when(mockSamzaContainer).run();
+
+ streamProcessor.jobCoordinatorListener.onNewJobModel("TestProcessorId", new JobModel(new MapConfig(), new HashMap<>()));
+
+ Mockito.verify(mockSamzaContainer, Mockito.atMost(1)).run();
+ }
+
+ @Test
+ public void testStopShouldBeIdempotent() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+ SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = PowerMockito.spy(new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator));
+
+ Mockito.doNothing().when(mockJobCoordinator).stop();
+ Mockito.doNothing().when(mockSamzaContainer).shutdown();
+ Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
+ Mockito.when(mockSamzaContainer.getStatus())
+ .thenReturn(SamzaContainerStatus.STARTED)
+ .thenReturn(SamzaContainerStatus.STOPPED);
+
+ streamProcessor.state = State.RUNNING;
+
+ streamProcessor.stop();
+
+ Assert.assertEquals(State.STOPPING, streamProcessor.state);
+ }
+
+ @Test
+ public void testCoordinatorFailureShouldStopTheStreamProcessor() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+ SamzaContainer mockSamzaContainer = Mockito.mock(SamzaContainer.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+
+ Exception failureException = new Exception("dummy exception");
+
+ streamProcessor.container = mockSamzaContainer;
+ streamProcessor.state = State.RUNNING;
+ streamProcessor.jobCoordinatorListener.onCoordinatorFailure(failureException);
+ Mockito.doNothing().when(mockSamzaContainer).shutdown();
+ Mockito.when(mockSamzaContainer.hasStopped()).thenReturn(false);
+
+
+ Assert.assertEquals(State.STOPPED, streamProcessor.state);
+ Mockito.verify(lifecycleListener).onFailure(failureException);
+ Mockito.verify(mockSamzaContainer).shutdown();
+ }
+
+ @Test
+ public void testCoordinatorStopShouldStopTheStreamProcessor() {
+ JobCoordinator mockJobCoordinator = Mockito.mock(JobCoordinator.class);
+ StreamProcessorLifecycleListener lifecycleListener = Mockito.mock(StreamProcessorLifecycleListener.class);
+ MapConfig config = new MapConfig(ImmutableMap.of("task.shutdown.ms", "0"));
+ StreamProcessor streamProcessor = new StreamProcessor(config, new HashMap<>(), null, lifecycleListener, mockJobCoordinator);
+
+ streamProcessor.state = State.RUNNING;
+ streamProcessor.jobCoordinatorListener.onCoordinatorStop();
+
+ Assert.assertEquals(State.STOPPED, streamProcessor.state);
+ Mockito.verify(lifecycleListener).onShutdown();
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index b27b151..9aca45e 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -203,7 +203,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
onContainerFailedThrowable = t
}
- override def onContainerStop(invokedExternally: Boolean): Unit = {
+ override def onContainerStop(): Unit = {
onContainerStopCalled = true
}
@@ -284,7 +284,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
onContainerFailedThrowable = t
}
- override def onContainerStop(invokedExternally: Boolean): Unit = {
+ override def onContainerStop(): Unit = {
onContainerStopCalled = true
}
@@ -367,7 +367,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
onContainerFailedThrowable = t
}
- override def onContainerStop(invokedExternally: Boolean): Unit = {
+ override def onContainerStop(): Unit = {
onContainerStopCalled = true
}
@@ -451,7 +451,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
onContainerFailedThrowable = t
}
- override def onContainerStop(invokedExternally: Boolean): Unit = {
+ override def onContainerStop(): Unit = {
onContainerStopCalled = true
}
@@ -530,7 +530,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
onContainerFailedThrowable = t
}
- override def onContainerStop(invokedExternally: Boolean): Unit = {
+ override def onContainerStop(): Unit = {
onContainerStopCalled = true
}
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
index 7253b29..5c28553 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessor.java
@@ -128,7 +128,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
// make sure it consumes all the messages from the first batch
waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
- CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch;
+ CountDownLatch containerStopped1 = sp1.containerShutdownLatch;
// start the second processor
CountDownLatch startWait2 = new CountDownLatch(1);
@@ -211,7 +211,7 @@ public class TestZkStreamProcessor extends TestZkStreamProcessorBase {
// make sure they consume all the messages from the first batch
waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
- CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch;
+ CountDownLatch containerStopped2 = sp2.containerShutdownLatch;
// stop the first processor
stopProcessor(stopLatch1);
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
index 374e77c..fb9c66b 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorFailures.java
@@ -108,8 +108,8 @@ public class TestZkStreamProcessorFailures extends TestZkStreamProcessorBase {
// make sure they consume all the messages
waitUntilMessagesLeftN(totalEventsToBeConsumed - messageCount);
- CountDownLatch containerStopped1 = sp1.jcContainerShutdownLatch;
- CountDownLatch containerStopped2 = sp2.jcContainerShutdownLatch;
+ CountDownLatch containerStopped1 = sp1.containerShutdownLatch;
+ CountDownLatch containerStopped2 = sp2.containerShutdownLatch;
// produce the bad messages
produceMessages(BAD_MESSAGE_KEY, inputTopic, 4);
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
index 40eeaf0..f518c0a 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorSession.java
@@ -98,10 +98,10 @@ public class TestZkStreamProcessorSession extends TestZkStreamProcessorBase {
waitUntilMessagesLeftN(totalEventsToGenerate - messageCount);
// Get the container stop latch to be able to check when a container is stopped.
- // New jcContainerShutdownLatch is created after each onNewJobModel,
+ // New containerShutdownLatch is created after each onNewJobModel,
// so we need to get the current one, before it changed..
for (int i = 0; i < processorIds.length; i++) {
- containerStopLatches[i] = streamProcessors[i].jcContainerShutdownLatch;
+ containerStopLatches[i] = streamProcessors[i].containerShutdownLatch;
}
// expire zk session of one of the processors
http://git-wip-us.apache.org/repos/asf/samza/blob/93b397e8/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index ea44052..bfa78a0 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,7 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.util.ArrayList;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -54,7 +54,6 @@ import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.test.processor.TestStreamApplication.StreamApplicationCallback;
-import org.apache.samza.test.processor.TestStreamApplication.TestKafkaEvent;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
@@ -279,16 +278,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Job model before and after the addition of second stream processor should be the same.
assertEquals(previousJobModel[0], updatedJobModel);
assertEquals(new MapConfig(), updatedJobModel.getConfig());
- // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
- // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
-
- // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
- // localApplicationRunner1.kill(streamApp1);
- // localApplicationRunner2.kill(streamApp2);
-
- // localApplicationRunner1.waitForFinish();
- // localApplicationRunner2.waitForFinish();
+ localApplicationRunner1.kill(streamApp1);
+ localApplicationRunner1.waitForFinish();
+ localApplicationRunner2.kill(streamApp2);
+ localApplicationRunner2.waitForFinish();
+ assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+ assertEquals(localApplicationRunner2.status(streamApp2), ApplicationStatus.UnsuccessfulFinish);
}
/**
@@ -387,13 +383,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
processedMessagesLatch.await();
assertEquals(ApplicationStatus.Running, localApplicationRunner2.status(streamApp2));
-
- // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
- // localApplicationRunner1.kill(streamApp1);
- // localApplicationRunner2.kill(streamApp2);
-
- // localApplicationRunner1.waitForFinish();
- // localApplicationRunner2.waitForFinish();
+ localApplicationRunner1.kill(streamApp1);
+ localApplicationRunner1.waitForFinish();
+ localApplicationRunner2.kill(streamApp2);
+ localApplicationRunner2.waitForFinish();
+ assertEquals(localApplicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
}
@Test
@@ -439,7 +433,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
applicationRunner1.kill(streamApp1);
applicationRunner1.waitForFinish();
- // How do you know here that leader has been reelected.
+ assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
kafkaEventsConsumedLatch.await();
publishKafkaEvents(inputKafkaTopic, 0, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
@@ -458,12 +452,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(Sets.newHashSet("0000000001", "0000000002"), jobModel.getContainers().keySet());
assertEquals(2, jobModel.getContainers().size());
- // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
- // applicationRunner2.kill(streamApp2);
- // applicationRunner3.kill(streamApp3);
-
- // applicationRunner2.waitForFinish();
- // applicationRunner3.waitForFinish();
+ applicationRunner2.kill(streamApp2);
+ applicationRunner2.waitForFinish();
+ assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish);
+ applicationRunner3.kill(streamApp3);
+ applicationRunner3.waitForFinish();
+ assertEquals(applicationRunner3.status(streamApp2), ApplicationStatus.SuccessfulFinish);
}
@Test
@@ -501,12 +495,11 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
try {
applicationRunner3.run(streamApp3);
} finally {
- // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
- // applicationRunner1.kill(streamApp1);
- // applicationRunner2.kill(streamApp2);
+ applicationRunner1.kill(streamApp1);
+ applicationRunner2.kill(streamApp2);
- // applicationRunner1.waitForFinish();
- // applicationRunner2.waitForFinish();
+ applicationRunner1.waitForFinish();
+ applicationRunner2.waitForFinish();
}
}
@@ -526,15 +519,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
- List<TestKafkaEvent> messagesProcessed = new ArrayList<>();
- StreamApplicationCallback streamApplicationCallback = m -> messagesProcessed.add(m);
-
// Create StreamApplication from configuration.
CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
- StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
// Run stream application.
@@ -551,10 +541,12 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
applicationRunner1.kill(streamApp1);
applicationRunner1.waitForFinish();
+ assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+
LocalApplicationRunner applicationRunner4 = new LocalApplicationRunner(applicationConfig1);
processedMessagesLatch1 = new CountDownLatch(1);
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
- streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, streamApplicationCallback, kafkaEventsConsumedLatch, applicationConfig1);
+ streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
applicationRunner4.run(streamApp1);
processedMessagesLatch1.await();
@@ -566,12 +558,85 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
assertEquals(Integer.parseInt(jobModelVersion) + 1, Integer.parseInt(newJobModelVersion));
assertEquals(jobModel.getContainers(), newJobModel.getContainers());
- // TODO: re-enable the following kill and waitForFinish calls after fixing SAMZA-1665
- // applicationRunner2.kill(streamApp2);
- // applicationRunner4.kill(streamApp1);
+ applicationRunner2.kill(streamApp2);
+ applicationRunner2.waitForFinish();
+ assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish);
+ applicationRunner4.kill(streamApp1);
+ applicationRunner4.waitForFinish();
+ assertEquals(applicationRunner4.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+ }
+
+ @Test
+ public void testShouldStopStreamApplicationWhenShutdownTimeOutIsLessThanContainerShutdownTime() throws Exception {
+ publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
+ configMap.put(TaskConfig.SHUTDOWN_MS(), "0");
+
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ Config applicationConfig1 = new MapConfig(configMap);
+
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ Config applicationConfig2 = new MapConfig(configMap);
+
+ LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
+ LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
+
+ // Create StreamApplication from configuration.
+ CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
+ CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+ CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+ StreamApplication streamApp1 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch1, null, kafkaEventsConsumedLatch, applicationConfig1);
+ StreamApplication streamApp2 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch2, null, kafkaEventsConsumedLatch, applicationConfig2);
+
+ applicationRunner1.run(streamApp1);
+ applicationRunner2.run(streamApp2);
+
+ processedMessagesLatch1.await();
+ processedMessagesLatch2.await();
+ kafkaEventsConsumedLatch.await();
+
+ // At this stage, both the processors are running and have drained the kakfa source.
+ // Trigger re-balancing phase, by manually adding a new processor.
+
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+ Config applicationConfig3 = new MapConfig(configMap);
+
+ LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig3);
+ CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
- // applicationRunner2.waitForFinish();
- // applicationRunner4.waitForFinish();
+ StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch3, null, kafkaEventsConsumedLatch, applicationConfig3);
+ applicationRunner3.run(streamApp3);
+
+ publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+ processedMessagesLatch3.await();
+
+ /**
+ * If the processing has started in the third stream processor, then other two stream processors should be stopped.
+ */
+ // TODO: This is a bug! Status should be unsuccessful finish.
+ assertEquals(applicationRunner1.status(streamApp1), ApplicationStatus.SuccessfulFinish);
+ assertEquals(applicationRunner2.status(streamApp2), ApplicationStatus.SuccessfulFinish);
+
+ applicationRunner3.kill(streamApp3);
+ applicationRunner3.waitForFinish();
+ assertEquals(applicationRunner3.status(streamApp3), ApplicationStatus.SuccessfulFinish);
+ }
+
+ private static class TestKafkaEvent implements Serializable {
+
+ // Actual content of the event.
+ private String eventData;
+
+ // Contains Integer value, which is greater than previous message id.
+ private String eventId;
+
+ TestKafkaEvent(String eventId, String eventData) {
+ this.eventId = eventData;
+ this.eventData = eventData;
+ }
}
}