You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/25 13:17:36 UTC

svn commit: r1161506 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/agent/ main/java/com/cloudera/flume/conf/ test/java/com/cloudera/flume/agent/ test/java/com/cloudera/flume/agent/diskfailover/ test/java/com/cloudera/flume/sh...

Author: jmhsieh
Date: Thu Aug 25 11:17:35 2011
New Revision: 1161506

URL: http://svn.apache.org/viewvc?rev=1161506&view=rev
Log:
FLUME-706: Flume nodes launch duplicate logical nodes

When a logical node is being spawned for the first time we attempt to load the config of the node.  Unfortunately, we would subsequently load it
again and spawn a second driver thread because we neglected to update the last good config version.  This fixes the problem by making sure that
value gets updated on the first attempt.   We also update error handling so that a failure of signle logical node spawn only affects that node.

Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/FlumeNode.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LivenessManager.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNode.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNodeManager.java
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfigData.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestAgentCloseNoDeadlock.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestLogicalNodeManager.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverBehavior.java
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/shell/TestFlumeShell.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/FlumeNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/FlumeNode.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/FlumeNode.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/FlumeNode.java Thu Aug 25 11:17:35 2011
@@ -44,8 +44,8 @@ import com.cloudera.flume.agent.durabili
 import com.cloudera.flume.agent.durability.WALManager;
 import com.cloudera.flume.conf.Context;
 import com.cloudera.flume.conf.FlumeBuilder;
+import com.cloudera.flume.conf.FlumeConfigData;
 import com.cloudera.flume.conf.FlumeConfiguration;
-import com.cloudera.flume.conf.FlumeSpecException;
 import com.cloudera.flume.conf.LogicalNodeContext;
 import com.cloudera.flume.handlers.debug.ChokeManager;
 import com.cloudera.flume.handlers.endtoend.AckListener;
@@ -509,12 +509,14 @@ public class FlumeNode implements Report
       LOG.info("Loading spec from command line: '" + spec + "'");
 
       try {
-        // TODO the first one should be physical node name
+        // node name is the default logical and physical name.
         Context ctx = new LogicalNodeContext(nodename, nodename);
         Map<String, Pair<String, String>> cfgs = FlumeBuilder.parseConf(ctx,
             spec);
         Pair<String, String> node = cfgs.get(nodename);
-        flume.nodesMan.spawn(nodename, node.getLeft(), node.getRight());
+        FlumeConfigData fcd = new FlumeConfigData(0, node.getLeft(),
+            node.getRight(), 0, 0, null);
+        flume.nodesMan.spawn(ctx, nodename, fcd);
       } catch (Exception e) {
         LOG.warn("Caught exception loading node:" + e.getMessage());
         LOG.debug("Exception: ", e);
@@ -524,12 +526,12 @@ public class FlumeNode implements Report
       }
 
     } else {
-      // default to null configurations.
       try {
-        flume.nodesMan.spawn(nodename, "null", "null");
-      } catch (FlumeSpecException e) {
-        LOG.error("This should never happen", e);
-      } catch (IOException e) {
+        // default to null configurations.
+        Context ctx = new LogicalNodeContext(nodename, nodename);
+        FlumeConfigData fcd = new FlumeConfigData(0, "null", "null", 0, 0, null);
+        flume.nodesMan.spawn(ctx, nodename, fcd);
+      } catch (Exception e) {
         LOG.error("Caught exception loading node", e);
       }
     }

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LivenessManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LivenessManager.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LivenessManager.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LivenessManager.java Thu Aug 25 11:17:35 2011
@@ -29,9 +29,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.flume.agent.durability.WALCompletionNotifier;
+import com.cloudera.flume.conf.Context;
 import com.cloudera.flume.conf.FlumeConfigData;
 import com.cloudera.flume.conf.FlumeConfiguration;
-import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.LogicalNodeContext;
 import com.cloudera.flume.handlers.endtoend.AckListener.Empty;
 import com.cloudera.util.Clock;
 import com.cloudera.util.Pair;
@@ -130,17 +131,19 @@ public class LivenessManager {
     for (String ln : lns) {
       // a logical node is not present? spawn it.
       if (nodesman.get(ln) == null) {
+        Context ctx = new LogicalNodeContext(physNode, ln);
+        final FlumeConfigData data = master.getConfig(ln);
         try {
-          final FlumeConfigData data = master.getConfig(ln);
           if (data == null) {
             LOG.debug("Logical Node '" + ln + "' not configured on master");
-            nodesman.spawn(ln, "null", "null");
+            nodesman.spawn(ctx, ln, null);
           } else {
-            nodesman.spawn(ln, data.getSourceConfig(), data.getSinkConfig());
+            nodesman.spawn(ctx, ln, data);
           }
-        } catch (FlumeSpecException e) {
-          LOG.error("This should never happen", e);
+        } catch (Exception e) {
+          LOG.error("Failed to spawn node '" + ln + "' " + data);
         }
+
       }
     }
     // Update the Chokeinformation for the ChokeManager
@@ -166,7 +169,6 @@ public class LivenessManager {
           LOG.debug("Logical Node '" + nd.getName()
               + "' not configured on master");
         }
-        final LogicalNode node = nd;
         enqueueCheckConfig(nd, data);
       }
     }

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNode.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNode.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNode.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNode.java Thu Aug 25 11:17:35 2011
@@ -137,6 +137,7 @@ public class LogicalNode implements Repo
    * source values into the sink. This will block until the previous driver has
    * shutdown but will return as the new driver is starting.
    */
+  @Deprecated
   synchronized void loadNodeDriver(EventSource newSrc, EventSink newSnk)
       throws IOException, InterruptedException {
     ensureClosedSourceSink(newSrc, newSnk);
@@ -190,7 +191,7 @@ public class LogicalNode implements Repo
   }
 
   public void loadConfig(FlumeConfigData cfg) throws IOException,
-      RuntimeException, FlumeSpecException {
+      RuntimeException, FlumeSpecException, InterruptedException {
 
     // got a newer configuration
     LOG.debug("Attempt to load config " + cfg);
@@ -245,19 +246,14 @@ public class LogicalNode implements Repo
       throw e;
     }
 
-    try {
-      loadNodeDriver(newSrc, newSnk);
+    loadNodeDriver(newSrc, newSnk);
 
-      // We have successfully opened the source and sinks for the config. We can
-      // mark this as the last good / successful config. It does not mean that
-      // this configuration will open without errors!
-      this.lastGoodCfg = cfg;
-
-      LOG.info("Node config successfully set to " + cfg);
-    } catch (InterruptedException e) {
-      // TODO figure out what to do on interruption
-      LOG.error("Load Config interrupted", e);
-    }
+    // We have successfully opened the source and sinks for the config. We can
+    // mark this as the last good / successful config. It does not mean that
+    // this configuration will open without errors!
+    this.lastGoodCfg = cfg;
+
+    LOG.info("Node config successfully set to " + cfg);
   }
 
   /**

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNodeManager.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNodeManager.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/LogicalNodeManager.java Thu Aug 25 11:17:35 2011
@@ -32,15 +32,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.cloudera.flume.conf.Context;
-import com.cloudera.flume.conf.FlumeBuilder;
 import com.cloudera.flume.conf.FlumeConfigData;
 import com.cloudera.flume.conf.FlumeConfiguration;
 import com.cloudera.flume.conf.FlumeSpecException;
 import com.cloudera.flume.conf.LogicalNodeContext;
 import com.cloudera.flume.conf.ReportTestingContext;
-import com.cloudera.flume.core.CompositeSink;
-import com.cloudera.flume.core.EventSink;
-import com.cloudera.flume.core.EventSource;
 import com.cloudera.flume.reporter.ReportEvent;
 import com.cloudera.flume.reporter.ReportManager;
 import com.cloudera.flume.reporter.Reportable;
@@ -67,21 +63,14 @@ public class LogicalNodeManager implemen
 
   /**
    * Give a logical node name, and src and a sink spec, generate a new logical
-   * node.
-   */
-  synchronized public void spawn(String name, String src, String snk)
-      throws IOException, FlumeSpecException {
-    Context ctx = new LogicalNodeContext(physicalNode, name);
-    spawn(ctx, name, FlumeBuilder.buildSource(ctx, src), new CompositeSink(ctx,
-        snk));
-  }
-
-  /**
-   * Give a logical node name, and src and a sink spec, generate a new logical
    * node. This gives assumes the current version and is ONLY used for testing.
+   * 
+   * @throws InterruptedException
+   * @throws RuntimeException
    */
   synchronized public void testingSpawn(String name, String src, String snk)
-      throws IOException, FlumeSpecException {
+      throws IOException, FlumeSpecException, RuntimeException,
+      InterruptedException {
     LogicalNode nd = threads.get(name);
     if (nd == null) {
       Context ctx = new ReportTestingContext(new LogicalNodeContext(
@@ -98,9 +87,8 @@ public class LogicalNodeManager implemen
 
   }
 
-  // TODO (jon) make private
-  synchronized void spawn(Context ctx, String name, EventSource src,
-      EventSink snk) throws IOException {
+  public synchronized void spawn(Context ctx, String name, FlumeConfigData fcd)
+      throws IOException, RuntimeException, FlumeSpecException, InterruptedException {
     LogicalNode nd = threads.get(name);
     if (nd == null) {
       LOG.info("creating new logical node " + name);
@@ -108,13 +96,12 @@ public class LogicalNodeManager implemen
       threads.put(nd.getName(), nd);
     }
 
-    try {
-      nd.loadNodeDriver(src, snk);
-    } catch (InterruptedException e) {
-      // TODO verify this is reasonable behavior
-      LOG.error("spawn was interrupted", e);
+    if (fcd == null) {
+      fcd = new FlumeConfigData(0, "null", "null", 0, 0, null);
     }
 
+    LOG.info("Loading node name with " + fcd);
+    nd.loadConfig(fcd);
   }
 
   /**

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfigData.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfigData.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfigData.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/conf/FlumeConfigData.java Thu Aug 25 11:17:35 2011
@@ -17,6 +17,8 @@
  */
 package com.cloudera.flume.conf;
 
+import java.util.Date;
+
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.XmlTransient;
 
@@ -60,9 +62,12 @@ public class FlumeConfigData {
     this.flowID = fcd.flowID;
   }
 
-  /** Empty constructor. **/
+  /** Empty constructor for RPC **/
   public FlumeConfigData() {
+  }
 
+  public static FlumeConfigData testingFlumeConfigData(String src, String snk) {
+    return new FlumeConfigData(0, src, snk, 0, 0, null);
   }
 
   public long getTimestamp() {
@@ -112,4 +117,11 @@ public class FlumeConfigData {
   public void setFlowID(String flowID) {
     this.flowID = flowID;
   }
+
+  public String toString() {
+    return String.format("FlumeConfigData: {srcVer:'%s' snkVer:'%s'  ts='%s' "
+        + "flowId:'%s' source:'%s' sink:'%s' }", new Date(sourceVersion),
+        new Date(sinkVersion), new Date(timestamp), flowID, sourceConfig,
+        sinkConfig);
+  }
 }

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestAgentCloseNoDeadlock.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestAgentCloseNoDeadlock.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestAgentCloseNoDeadlock.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestAgentCloseNoDeadlock.java Thu Aug 25 11:17:35 2011
@@ -27,7 +27,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.cloudera.flume.conf.FlumeConfigData;
 import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.LogicalNodeContext;
 
 /**
  * These test cases verify that closing an agent eventually exits instead of
@@ -57,7 +59,8 @@ public class TestAgentCloseNoDeadlock {
         try {
           go.await();
           while (heartstop.getCount() > 0) {
-            lnm.spawn("foo1", "asciisynth(1)", sink);
+            lnm.spawn(LogicalNodeContext.testingContext(), "foo1",
+                FlumeConfigData.testingFlumeConfigData("asciisynth(1)", sink));
             heartstop.countDown();
           }
         } catch (Exception e) {
@@ -84,8 +87,8 @@ public class TestAgentCloseNoDeadlock {
     }.start();
 
     go.countDown();
-    assertTrue("heartbeat thread blocked", heartstop.await(200,
-        TimeUnit.SECONDS));
+    assertTrue("heartbeat thread blocked",
+        heartstop.await(200, TimeUnit.SECONDS));
   }
 
   /**

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestLogicalNodeManager.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestLogicalNodeManager.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/TestLogicalNodeManager.java Thu Aug 25 11:17:35 2011
@@ -26,6 +26,7 @@ import java.util.List;
 import org.junit.Test;
 
 import com.cloudera.flume.conf.FlumeSpecException;
+import com.cloudera.flume.conf.LogicalNodeContext;
 import com.google.common.collect.Lists;
 
 /**
@@ -37,9 +38,9 @@ public class TestLogicalNodeManager {
   public void testSpawnDecomission() throws IOException, FlumeSpecException,
       InterruptedException {
     LogicalNodeManager lnm = new LogicalNodeManager("local");
-    lnm.spawn("foo1", "null", "null");
-    lnm.spawn("foo2", "null", "null");
-    lnm.spawn("foo3", "null", "null");
+    lnm.spawn(LogicalNodeContext.testingContext(), "foo1", null);
+    lnm.spawn(LogicalNodeContext.testingContext(),"foo2", null);
+    lnm.spawn(LogicalNodeContext.testingContext(),"foo3", null);
 
     assertEquals(3, lnm.getNodes().size());
 
@@ -59,9 +60,9 @@ public class TestLogicalNodeManager {
   public void testSpawnDecomissionAllBut() throws IOException,
       FlumeSpecException, InterruptedException {
     LogicalNodeManager lnm = new LogicalNodeManager("local");
-    lnm.spawn("foo1", "null", "null");
-    lnm.spawn("foo2", "null", "null");
-    lnm.spawn("foo3", "null", "null");
+    lnm.spawn(LogicalNodeContext.testingContext(),"foo1", null);
+    lnm.spawn(LogicalNodeContext.testingContext(),"foo2", null);
+    lnm.spawn(LogicalNodeContext.testingContext(),"foo3", null);
 
     assertEquals(3, lnm.getNodes().size());
 

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverBehavior.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverBehavior.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverBehavior.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/diskfailover/TestDiskFailoverBehavior.java Thu Aug 25 11:17:35 2011
@@ -68,7 +68,7 @@ public class TestDiskFailoverBehavior {
   }
 
   LogicalNode setupAgent(long count, String agentSink) throws IOException,
-      RuntimeException, FlumeSpecException {
+      RuntimeException, FlumeSpecException, InterruptedException {
     LogicalNode agent = new LogicalNode(
         new LogicalNodeContext("phys", "agent"), "agent");
     FlumeConfigData fcd = new FlumeConfigData(0, "asciisynth(" + count + ")",
@@ -78,7 +78,7 @@ public class TestDiskFailoverBehavior {
   }
 
   LogicalNode setupColl(long port, String name, String acc) throws IOException,
-      RuntimeException, FlumeSpecException {
+      RuntimeException, FlumeSpecException, InterruptedException {
     Context ctx = new LogicalNodeContext(new ReportTestingContext(), "phys",
         name);
     LogicalNode coll = new LogicalNode(ctx, name);

Modified: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/shell/TestFlumeShell.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/shell/TestFlumeShell.java?rev=1161506&r1=1161505&r2=1161506&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/shell/TestFlumeShell.java (original)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/shell/TestFlumeShell.java Thu Aug 25 11:17:35 2011
@@ -33,6 +33,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintStream;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.thrift.transport.TTransportException;
 import org.junit.Ignore;
@@ -42,8 +43,13 @@ import org.slf4j.LoggerFactory;
 
 import com.cloudera.flume.agent.DirectMasterRPC;
 import com.cloudera.flume.agent.FlumeNode;
+import com.cloudera.flume.conf.Context;
+import com.cloudera.flume.conf.FlumeBuilder;
 import com.cloudera.flume.conf.FlumeConfigData;
 import com.cloudera.flume.conf.FlumeConfiguration;
+import com.cloudera.flume.conf.SourceFactory.SourceBuilder;
+import com.cloudera.flume.conf.SourceFactoryImpl;
+import com.cloudera.flume.core.EventSource;
 import com.cloudera.flume.master.ConfigurationManager;
 import com.cloudera.flume.master.SetupMasterTestEnv;
 import com.cloudera.flume.master.StatusManager.NodeState;
@@ -514,4 +520,48 @@ public class TestFlumeShell extends Setu
     assertNull(flumeMaster.getStatMan().getStatus("foo"));
     assertNull(flumeMaster.getStatMan().getStatus("foo"));
   }
+
+  /**
+   * Create a master, connect via shell, and issue and map a config with a
+   * single instance of a source that increments a value ever open. Then load
+   * configuration. Verify that it came up exactly once.
+   */
+  @Test
+  public void testNoDoubleDriverOpen() throws InterruptedException,
+      TTransportException, IOException {
+    assertEquals(0, flumeMaster.getSpecMan().getAllConfigs().size());
+    final AtomicInteger i = new AtomicInteger();
+    SourceFactoryImpl sfi = new SourceFactoryImpl();
+    sfi.setSource("openCount", new SourceBuilder() {
+      @Override
+      public EventSource build(Context context, String... argv) {
+        return new EventSource.Base() {
+          public void open() throws InterruptedException {
+            i.incrementAndGet();
+          }
+        };
+      }
+
+    });
+    FlumeBuilder.setSourceFactory(sfi);
+
+    String nodename = "foo";
+    FlumeConfiguration conf = FlumeConfiguration.createTestableConfiguration();
+
+    FlumeShell sh = new FlumeShell();
+    sh.executeLine("connect localhost: "
+        + FlumeConfiguration.DEFAULT_ADMIN_PORT);
+    sh.executeLine("exec map foo bar");
+    sh.executeLine("exec config bar 'openCount' 'null' ");
+
+    FlumeNode n = new FlumeNode(conf, nodename,
+        new DirectMasterRPC(flumeMaster), false, false);
+    n.start();
+    Clock.sleep(1000); // if double open, one thread will get port, the other
+                       // will fail
+    n.stop();
+    // the magic count should only be 1
+    assertEquals(1, i.get());
+  }
+
 }