You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/01/16 22:17:53 UTC
git commit: FLUME-1849. Embedded Agent doesn't shutdown supervisor.
Updated Branches:
refs/heads/trunk 118752374 -> ab0894c7f
FLUME-1849. Embedded Agent doesn't shutdown supervisor.
(Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ab0894c7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ab0894c7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ab0894c7
Branch: refs/heads/trunk
Commit: ab0894c7f014769a9b400e6b5cb2f5c055c6b065
Parents: 1187523
Author: Hari Shreedharan <ha...@gmail.com>
Authored: Wed Jan 16 13:17:09 2013 -0800
Committer: Hari Shreedharan <ha...@gmail.com>
Committed: Wed Jan 16 13:17:09 2013 -0800
----------------------------------------------------------------------
.../apache/flume/agent/embedded/EmbeddedAgent.java | 51 +--------------
.../embedded/TestEmbeddedAgentEmbeddedSource.java | 28 --------
2 files changed, 2 insertions(+), 77 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/ab0894c7/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
index 4adbea7..d02f440 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
/**
* EmbeddedAgent gives Flume users the ability to embed simple agents in
@@ -136,7 +135,7 @@ public class EmbeddedAgent {
if(state != State.STARTED) {
throw new IllegalStateException("Cannot be stopped unless started");
}
- doStop();
+ supervisor.stop();
embeddedSource = null;
state = State.STOPPED;
}
@@ -212,7 +211,6 @@ public class EmbeddedAgent {
private void doStart() {
boolean error = true;
- List<LifecycleAware> supervised = Lists.newArrayList();
try {
channel.start();
sinkRunner.start();
@@ -220,28 +218,17 @@ public class EmbeddedAgent {
supervisor.supervise(channel,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- supervised.add(channel);
supervisor.supervise(sinkRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- supervised.add(sinkRunner);
supervisor.supervise(sourceRunner,
new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
- supervised.add(sourceRunner);
-
error = false;
} finally {
if(error) {
- for(LifecycleAware lifeCycleAware : supervised) {
- try {
- supervisor.unsupervise(lifeCycleAware);
- } catch (Exception e) {
- LOGGER.warn("Exception while stopping " + lifeCycleAware +
- " due to error on startup", e);
- }
- }
stopLogError(sourceRunner);
stopLogError(channel);
stopLogError(sinkRunner);
+ supervisor.stop();
}
}
}
@@ -254,40 +241,6 @@ public class EmbeddedAgent {
LOGGER.warn("Exception while stopping " + lifeCycleAware, e);
}
}
- private void doStop() {
- Exception exception = null;
- // source
- try {
- if(LifecycleState.START.equals(sourceRunner.getLifecycleState())) {
- sourceRunner.stop();
- }
- } catch (Exception e) {
- exception = e;
- LOGGER.error("Caught exception stopping source " + sourceRunner, e);
- }
- // sink
- try {
- if(LifecycleState.START.equals(sinkRunner.getLifecycleState())) {
- sinkRunner.stop();
- }
- } catch (Exception e) {
- exception = e;
- LOGGER.error("Caught exception stopping sink " + sinkRunner, e);
- }
- // channel
- try {
- if(LifecycleState.START.equals(channel.getLifecycleState())) {
- channel.stop();
- }
- } catch (Exception e) {
- exception = e;
- LOGGER.error("Caught exception stopping channel " + channel, e);
- }
- if(exception != null) {
- throw new FlumeException("Error stopping one or more components " +
- "check the logs for an exhaustive list of errors", exception);
- }
- }
private static enum State {
NEW(),
http://git-wip-us.apache.org/repos/asf/flume/blob/ab0894c7/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
index b315770..4e94d72 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
@@ -165,34 +165,6 @@ public class TestEmbeddedAgentEmbeddedSource {
private static final long serialVersionUID = 116546244849853151L;
}
@Test
- public void testStopSourceThrowsException() {
- doThrow(new LocalRuntimeException()).when(sourceRunner).stop();
- stopExpectingLocalRuntimeException();
- }
- @Test
- public void testStopChannelThrowsException() {
- doThrow(new LocalRuntimeException()).when(channel).stop();
- stopExpectingLocalRuntimeException();
- }
- @Test
- public void testStopSinkThrowsException() {
- doThrow(new LocalRuntimeException()).when(sinkRunner).stop();
- stopExpectingLocalRuntimeException();
- }
- private void stopExpectingLocalRuntimeException() {
- agent.configure(properties);
- agent.start();
- try {
- agent.stop();
- Assert.fail();
- } catch (FlumeException e) {
- Assert.assertTrue(e.getCause() instanceof LocalRuntimeException);
- }
- verify(sourceRunner, times(1)).stop();
- verify(channel, times(1)).stop();
- verify(sinkRunner, times(1)).stop();
- }
- @Test
public void testPut() throws EventDeliveryException {
Event event = new SimpleEvent();
agent.configure(properties);