You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2013/01/16 22:17:53 UTC

git commit: FLUME-1849. Embedded Agent doesn't shutdown supervisor.

Updated Branches:
  refs/heads/trunk 118752374 -> ab0894c7f


FLUME-1849. Embedded Agent doesn't shutdown supervisor.

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ab0894c7
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ab0894c7
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ab0894c7

Branch: refs/heads/trunk
Commit: ab0894c7f014769a9b400e6b5cb2f5c055c6b065
Parents: 1187523
Author: Hari Shreedharan <ha...@gmail.com>
Authored: Wed Jan 16 13:17:09 2013 -0800
Committer: Hari Shreedharan <ha...@gmail.com>
Committed: Wed Jan 16 13:17:09 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/agent/embedded/EmbeddedAgent.java |   51 +--------------
 .../embedded/TestEmbeddedAgentEmbeddedSource.java  |   28 --------
 2 files changed, 2 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ab0894c7/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
index 4adbea7..d02f440 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgent.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 /**
  * EmbeddedAgent gives Flume users the ability to embed simple agents in
@@ -136,7 +135,7 @@ public class EmbeddedAgent {
     if(state != State.STARTED) {
       throw new IllegalStateException("Cannot be stopped unless started");
     }
-    doStop();
+    supervisor.stop();
     embeddedSource = null;
     state = State.STOPPED;
   }
@@ -212,7 +211,6 @@ public class EmbeddedAgent {
 
   private void doStart() {
     boolean error = true;
-    List<LifecycleAware> supervised = Lists.newArrayList();
     try {
       channel.start();
       sinkRunner.start();
@@ -220,28 +218,17 @@ public class EmbeddedAgent {
 
       supervisor.supervise(channel,
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      supervised.add(channel);
       supervisor.supervise(sinkRunner,
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      supervised.add(sinkRunner);
       supervisor.supervise(sourceRunner,
           new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
-      supervised.add(sourceRunner);
-
       error = false;
     } finally {
       if(error) {
-        for(LifecycleAware lifeCycleAware : supervised) {
-          try {
-            supervisor.unsupervise(lifeCycleAware);
-          } catch (Exception e) {
-            LOGGER.warn("Exception while stopping " + lifeCycleAware +
-                " due to error on startup", e);
-          }
-        }
         stopLogError(sourceRunner);
         stopLogError(channel);
         stopLogError(sinkRunner);
+        supervisor.stop();
       }
     }
   }
@@ -254,40 +241,6 @@ public class EmbeddedAgent {
       LOGGER.warn("Exception while stopping " + lifeCycleAware, e);
     }
   }
-  private void doStop() {
-    Exception exception = null;
-    // source
-    try {
-      if(LifecycleState.START.equals(sourceRunner.getLifecycleState())) {
-        sourceRunner.stop();
-      }
-    } catch (Exception e) {
-      exception = e;
-      LOGGER.error("Caught exception stopping source " + sourceRunner, e);
-    }
-    // sink
-    try {
-      if(LifecycleState.START.equals(sinkRunner.getLifecycleState())) {
-        sinkRunner.stop();
-      }
-    } catch (Exception e) {
-      exception = e;
-      LOGGER.error("Caught exception stopping sink " + sinkRunner, e);
-    }
-    // channel
-    try {
-      if(LifecycleState.START.equals(channel.getLifecycleState())) {
-        channel.stop();
-      }
-    } catch (Exception e) {
-      exception = e;
-      LOGGER.error("Caught exception stopping channel " + channel, e);
-    }
-    if(exception != null) {
-      throw new FlumeException("Error stopping one or more components " +
-          "check the logs for an exhaustive list of errors", exception);
-    }
-  }
 
   private static enum State {
     NEW(),

http://git-wip-us.apache.org/repos/asf/flume/blob/ab0894c7/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
index b315770..4e94d72 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
@@ -165,34 +165,6 @@ public class TestEmbeddedAgentEmbeddedSource {
     private static final long serialVersionUID = 116546244849853151L;
   }
   @Test
-  public void testStopSourceThrowsException() {
-    doThrow(new LocalRuntimeException()).when(sourceRunner).stop();
-    stopExpectingLocalRuntimeException();
-  }
-  @Test
-  public void testStopChannelThrowsException() {
-    doThrow(new LocalRuntimeException()).when(channel).stop();
-    stopExpectingLocalRuntimeException();
-  }
-  @Test
-  public void testStopSinkThrowsException() {
-    doThrow(new LocalRuntimeException()).when(sinkRunner).stop();
-    stopExpectingLocalRuntimeException();
-  }
-  private void stopExpectingLocalRuntimeException() {
-    agent.configure(properties);
-    agent.start();
-    try {
-      agent.stop();
-      Assert.fail();
-    } catch (FlumeException e) {
-      Assert.assertTrue(e.getCause() instanceof LocalRuntimeException);
-    }
-    verify(sourceRunner, times(1)).stop();
-    verify(channel, times(1)).stop();
-    verify(sinkRunner, times(1)).stop();
-  }
-  @Test
   public void testPut() throws EventDeliveryException {
     Event event = new SimpleEvent();
     agent.configure(properties);