You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/20 02:48:50 UTC

svn commit: r1159825 - in /incubator/flume/trunk/flume-core/src: main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java

Author: jmhsieh
Date: Sat Aug 20 00:48:50 2011
New Revision: 1159825

URL: http://svn.apache.org/viewvc?rev=1159825&view=rev
Log:
FLUME-745: Race condition in NaiveFileWALDeco and retransmit logic

- Setup test to run for a long time exacerbating potential race every 10ms.
- Made test runnable from command line for arbitrary iterations
- Eliminated possible memory leak by remove WALdata entry after completing e2eacked
- NaiveFileWALDeco to use object lock

Added:
    incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
Modified:
    incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java

Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java?rev=1159825&r1=1159824&r2=1159825&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java Sat Aug 20 00:48:50 2011
@@ -101,18 +101,33 @@ public class NaiveFileWALManager impleme
 
   private volatile boolean shuttingDown = false;
 
+  private Object lock = new Object();
+
   /**
    * Simple record for keeping the state of tag.
    */
   static class WALData {
     State s;
     String tag;
+    int sentCount = 0;
 
     WALData(String tag) {
       this.s = State.WRITING;
       this.tag = tag;
     }
 
+    void incrementSentCount() {
+      sentCount++;
+    }
+
+    int getSentCount() {
+      return sentCount;
+    }
+
+    public String toString() {
+      return "WALData { " + tag + " state:" + s + " sent:" + sentCount + "}";
+    }
+
     static WALData recovered(String tag) {
       WALData data = new WALData(tag);
       data.s = State.LOGGED;
@@ -148,35 +163,37 @@ public class NaiveFileWALManager impleme
     this.baseDir = baseDir;
   }
 
-  synchronized public void open() throws IOException {
-    // make the dirs if they do not exist
-    if (!FileUtil.makeDirs(importDir)) {
-      throw new IOException("Unable to create import dir: " + importDir);
-    }
-    if (!FileUtil.makeDirs(writingDir)) {
-      throw new IOException("Unable to create writing dir: " + writingDir);
-    }
-    if (!FileUtil.makeDirs(loggedDir)) {
-      throw new IOException("Unable to create logged dir: " + loggedDir);
-    }
-    if (!FileUtil.makeDirs(sendingDir)) {
-      throw new IOException("Unable to create sending dir: " + sendingDir);
-    }
-    if (!FileUtil.makeDirs(sentDir)) {
-      throw new IOException("Unable to create import dir: " + sentDir);
-    }
-    if (!FileUtil.makeDirs(doneDir)) {
-      throw new IOException("Unable to create writing dir: " + doneDir);
-    }
-    if (!FileUtil.makeDirs(errorDir)) {
-      throw new IOException("Unable to create logged dir: " + errorDir);
-    }
+  public void open() throws IOException {
+    synchronized (lock) {
+      // make the dirs if they do not exist
+      if (!FileUtil.makeDirs(importDir)) {
+        throw new IOException("Unable to create import dir: " + importDir);
+      }
+      if (!FileUtil.makeDirs(writingDir)) {
+        throw new IOException("Unable to create writing dir: " + writingDir);
+      }
+      if (!FileUtil.makeDirs(loggedDir)) {
+        throw new IOException("Unable to create logged dir: " + loggedDir);
+      }
+      if (!FileUtil.makeDirs(sendingDir)) {
+        throw new IOException("Unable to create sending dir: " + sendingDir);
+      }
+      if (!FileUtil.makeDirs(sentDir)) {
+        throw new IOException("Unable to create import dir: " + sentDir);
+      }
+      if (!FileUtil.makeDirs(doneDir)) {
+        throw new IOException("Unable to create writing dir: " + doneDir);
+      }
+      if (!FileUtil.makeDirs(errorDir)) {
+        throw new IOException("Unable to create logged dir: " + errorDir);
+      }
 
-    if (shuttingDown) {
-      LOG.warn("Strange, shutting down but now reopening");
+      if (shuttingDown) {
+        LOG.warn("Strange, shutting down but now reopening");
+      }
+      shuttingDown = false;
+      LOG.info("NaiveFileWALManager is now open");
     }
-    shuttingDown = false;
-    LOG.info("NaiveFileWALManager is now open");
   }
 
   public Collection<String> getWritingTags() {
@@ -200,12 +217,14 @@ public class NaiveFileWALManager impleme
    * 
    * This is not a blocking close.
    */
-  synchronized public void stopDrains() throws IOException {
-    if (shuttingDown) {
-      LOG.warn("Already shutting down, but getting another shutting down notice, odd");
+  public void stopDrains() throws IOException {
+    synchronized (lock) {
+      if (shuttingDown) {
+        LOG.warn("Already shutting down, but getting another shutting down notice, odd");
+      }
+      shuttingDown = true;
+      LOG.info("NaiveFileWALManager shutting down");
     }
-    shuttingDown = true;
-    LOG.info("NaiveFileWALManager shutting down");
   }
 
   /**
@@ -393,121 +412,126 @@ public class NaiveFileWALManager impleme
    * restart from there. Optimizations can recover at finer grain and be more
    * performant.
    */
-  synchronized public void recover() throws IOException {
+  public void recover() throws IOException {
+    synchronized (lock) {
+      // move all writing into the logged dir.
+      for (String f : writingDir.list()) {
+        try {
+          recoverLog(writingDir, f);
+        } catch (InterruptedException e) {
+          LOG.error("Interupted when trying to recover WAL log {}", f, e);
+          throw new IOException("Unable to recover " + writingDir + f);
+        }
+      }
 
-    // move all writing into the logged dir.
-    for (String f : writingDir.list()) {
-      try {
-        recoverLog(writingDir, f);
-      } catch (InterruptedException e) {
-        LOG.error("Interupted when trying to recover WAL log {}", f, e);
-        throw new IOException("Unable to recover " + writingDir + f);
+      // move all sending into the logged dir
+      for (String f : sendingDir.list()) {
+        try {
+          recoverLog(sendingDir, f);
+        } catch (InterruptedException e) {
+          LOG.error("Interupted when trying to recover WAL log {}", f, e);
+          throw new IOException("Unable to recover " + sendingDir + f);
+        }
       }
-    }
 
-    // move all sending into the logged dir
-    for (String f : sendingDir.list()) {
-      try {
-        recoverLog(sendingDir, f);
-      } catch (InterruptedException e) {
-        LOG.error("Interupted when trying to recover WAL log {}", f, e);
-        throw new IOException("Unable to recover " + sendingDir + f);
+      // move all sent into the logged dir.
+      for (String f : sentDir.list()) {
+        try {
+          recoverLog(sentDir, f);
+        } catch (InterruptedException e) {
+          LOG.error("Interupted when trying to recover WAL log {}", f, e);
+          throw new IOException("Unable to recover " + sentDir + f);
+        }
       }
-    }
 
-    // move all sent into the logged dir.
-    for (String f : sentDir.list()) {
-      try {
-        recoverLog(sentDir, f);
-      } catch (InterruptedException e) {
-        LOG.error("Interupted when trying to recover WAL log {}", f, e);
-        throw new IOException("Unable to recover " + sentDir + f);
+      // add all logged to loggedQ and table
+      for (String f : loggedDir.list()) {
+        // File log = new File(loggedDir, f);
+        WALData data = WALData.recovered(f);
+        table.put(f, data);
+        loggedQ.add(f);
+        recoverCount.incrementAndGet();
+        LOG.debug("Recover loaded {}", f);
       }
-    }
 
-    // add all logged to loggedQ and table
-    for (String f : loggedDir.list()) {
-      // File log = new File(loggedDir, f);
-      WALData data = WALData.recovered(f);
-      table.put(f, data);
-      loggedQ.add(f);
-      recoverCount.incrementAndGet();
-      LOG.debug("Recover loaded {}", f);
+      // carry on now on your merry way.
     }
-
-    // carry on now on your merry way.
   }
 
   /**
    * This gets a new sink when rolling, and is called when rolling to a new
    * file.
    */
-  synchronized public EventSink newAckWritingSink(final Tagger tagger,
-      AckListener al) throws IOException {
-    File dir = getDir(State.WRITING);
-    final String tag = tagger.newTag();
-
-    EventSink bareSink = new SeqfileEventSink(
-        new File(dir, tag).getAbsoluteFile());
-    EventSink curSink = new AckChecksumInjector<EventSink>(bareSink,
-        tag.getBytes(), al);
-
-    writingQ.add(tag);
-    WALData data = new WALData(tag);
-    table.put(tag, data);
-    writingCount.incrementAndGet();
+  public EventSink newAckWritingSink(final Tagger tagger, AckListener al)
+      throws IOException {
+    synchronized (lock) {
+      File dir = getDir(State.WRITING);
+      final String tag = tagger.newTag();
+
+      EventSink bareSink = new SeqfileEventSink(
+          new File(dir, tag).getAbsoluteFile());
+      EventSink curSink = new AckChecksumInjector<EventSink>(bareSink,
+          tag.getBytes(), al);
 
-    return new EventSinkDecorator<EventSink>(curSink) {
-      @Override
-      public void append(Event e) throws IOException, InterruptedException {
-        getSink().append(e);
-      }
+      writingQ.add(tag);
+      WALData data = new WALData(tag);
+      table.put(tag, data);
+      writingCount.incrementAndGet();
 
-      @Override
-      public void close() throws IOException, InterruptedException {
-        super.close();
-        synchronized (NaiveFileWALManager.this) {
-          if (!writingQ.contains(tag)) {
-            LOG.warn("Already changed tag {} out of WRITING state", tag);
-            return;
-          }
-          LOG.info("File lives in {}", getFile(tag));
+      return new EventSinkDecorator<EventSink>(curSink) {
+        @Override
+        public void append(Event e) throws IOException, InterruptedException {
+          getSink().append(e);
+        }
 
-          changeState(tag, State.WRITING, State.LOGGED);
-          loggedCount.incrementAndGet();
+        @Override
+        public void close() throws IOException, InterruptedException {
+          super.close();
+          synchronized (lock) {
+            if (!writingQ.contains(tag)) {
+              LOG.warn("Already changed tag {} out of WRITING state", tag);
+              return;
+            }
+            LOG.info("File lives in {}", getFile(tag));
+
+            changeState(tag, State.WRITING, State.LOGGED);
+            loggedCount.incrementAndGet();
+          }
         }
-      }
-    };
+      };
+    }
   }
 
   /**
    * Returns a new sink when the roller asks for a new one.
    */
-  synchronized public EventSink newWritingSink(final Tagger tagger)
-      throws IOException {
-    File dir = getDir(State.WRITING);
-    final String tag = tagger.newTag();
-    EventSink curSink = new SeqfileEventSink(
-        new File(dir, tag).getAbsoluteFile());
-    writingQ.add(tag);
-    WALData data = new WALData(tag);
-    table.put(tag, data);
-
-    return new EventSinkDecorator<EventSink>(curSink) {
-      @Override
-      public void append(Event e) throws IOException, InterruptedException {
-        LOG.debug("Appending event: {}", e); // performance sensitive
-        getSink().append(e);
+  public EventSink newWritingSink(final Tagger tagger) throws IOException {
+    synchronized (lock) {
+      File dir = getDir(State.WRITING);
+      final String tag = tagger.newTag();
+      EventSink curSink = new SeqfileEventSink(
+          new File(dir, tag).getAbsoluteFile());
+      writingQ.add(tag);
+      WALData data = new WALData(tag);
+      table.put(tag, data);
 
-      }
+      return new EventSinkDecorator<EventSink>(curSink) {
+        @Override
+        public void append(Event e) throws IOException, InterruptedException {
+          LOG.debug("Appending event: {}", e); // performance sensitive
+          getSink().append(e);
 
-      @Override
-      public void close() throws IOException, InterruptedException {
-        super.close();
-        changeState(tag, State.WRITING, State.LOGGED);
+        }
 
-      }
-    };
+        @Override
+        public void close() throws IOException, InterruptedException {
+          synchronized (lock) {
+            super.close();
+            changeState(tag, State.WRITING, State.LOGGED);
+          }
+        }
+      };
+    }
   }
 
   @Override
@@ -563,6 +587,12 @@ public class NaiveFileWALManager impleme
     }
   }
 
+  /**
+   * This needs to be called from a lock protected call site
+   * 
+   * @param tag
+   * @return
+   */
   private File getFile(String tag) {
     Preconditions.checkNotNull(tag, "Attempted to get file for empty tag");
     WALData data = table.get(tag);
@@ -580,66 +610,73 @@ public class NaiveFileWALManager impleme
    * This can throw both IOExceptions and runtime exceptions due to
    * Preconditions failures.
    */
-  synchronized void changeState(String tag, State oldState, State newState)
+  void changeState(String tag, State oldState, State newState)
       throws IOException {
-    WALData data = table.get(tag);
-    Preconditions.checkArgument(data != null, "Tag " + tag + " has no data");
-    Preconditions.checkArgument(tag.equals(data.tag),
-        "Data associated with tag didn't match tag " + tag);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Change " + data.s + "/" + oldState + " to " + newState + " : "
-          + tag);
-    }
-
-    // null allows any previous state.
-    if (oldState == null) {
-      oldState = data.s;
-    }
-
-    Preconditions.checkState(data.s == oldState, "Expected state to be "
-        + oldState + " but was " + data.s);
-
-    if (oldState == State.ERROR) {
-      throw new IllegalStateException("Cannot move from error state");
-    }
-
-    /*
-     * This uses java's File.rename and File.delete method. According to the
-     * link below, Solaris (I assume POSIX/linux) does atomic rename but Windows
-     * does not guarantee it.
-     * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4017593 To be truly
-     * correct, I need to check the return value (will likely fail in unix if
-     * moving from one volume to another instead of just within same volume)
-     */
-
-    // move files to other directories to making state change durable.
-    File orig = getFile(tag);
-    File newf = new File(getDir(newState), tag);
-    boolean success = orig.renameTo(newf);
-    if (!success) {
-      throw new IOException("Move  " + orig + " -> " + newf + "failed!");
-    }
-
-    // E2EACKED is terminal state just delete it.
-    // TODO (jon) add option to keep logged files
-    if (newState == State.E2EACKED) {
-      LOG.debug("Deleting WAL file: {}", newf.getAbsoluteFile());
-      boolean res = newf.delete();
-      if (!res) {
-        LOG.warn("Failed to delete complete WAL file: {}",
-            newf.getAbsoluteFile());
+    synchronized (lock) {
+      WALData data = table.get(tag);
+      Preconditions.checkArgument(data != null, "Tag " + tag + " has no data");
+      Preconditions.checkArgument(tag.equals(data.tag),
+          "Data associated with tag didn't match tag " + tag);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Change " + data.s + "/" + oldState + " to " + newState
+            + " : " + tag);
+      }
+
+      // null allows any previous state.
+      if (oldState == null) {
+        oldState = data.s;
+      }
+
+      if (oldState == State.SENT && newState == State.LOGGED) {
+        data.incrementSentCount();
+      }
+
+      Preconditions.checkState(data.s == oldState, "Expected state to be "
+          + oldState + " but was " + data.s);
+
+      if (oldState == State.ERROR) {
+        throw new IllegalStateException("Cannot move from error state");
+      }
+
+      /*
+       * This uses java's File.rename and File.delete method. According to the
+       * link below, Solaris (I assume POSIX/linux) does atomic rename but
+       * Windows does not guarantee it.
+       * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4017593 To be truly
+       * correct, I need to check the return value (will likely fail in unix if
+       * moving from one volume to another instead of just within same volume)
+       */
+
+      // move files to other directories to making state change durable.
+      File orig = getFile(tag);
+      File newf = new File(getDir(newState), tag);
+      boolean success = orig.renameTo(newf);
+      if (!success) {
+        throw new IOException("Move  " + orig + " -> " + newf + "failed!");
+      }
+
+      // E2EACKED is terminal state just delete it.
+      // TODO (jon) add option to keep logged files
+      if (newState == State.E2EACKED) {
+        LOG.debug("Deleting WAL file: {}", newf.getAbsoluteFile());
+        boolean res = newf.delete();
+        if (!res) {
+          LOG.warn("Failed to delete complete WAL file: {}",
+              newf.getAbsoluteFile());
+        }
+        table.remove(tag);
       }
-    }
 
-    // is successful, update queues.
-    LOG.debug("old state is {}", oldState);
-    getQueue(oldState).remove(tag);
-    BlockingQueue<String> q = getQueue(newState);
-    if (q != null) {
-      q.add(tag);
+      // is successful, update queues.
+      LOG.debug("old state is {}", oldState);
+      getQueue(oldState).remove(tag);
+      BlockingQueue<String> q = getQueue(newState);
+      if (q != null) {
+        q.add(tag);
+      }
+      data.s = newState;
     }
-    data.s = newState;
   }
 
   /**
@@ -731,7 +768,7 @@ public class NaiveFileWALManager impleme
         // exit condition is when closed is flagged and the queue is empty.
 
         if (sendingTag == null) {
-          synchronized (this) {
+          synchronized (lock) {
             if (shuttingDown && loggedQ.isEmpty() && sendingQ.isEmpty())
               return null;
           }
@@ -739,7 +776,7 @@ public class NaiveFileWALManager impleme
       }
     } catch (InterruptedException e) {
       LOG.error("interrupted", e);
-      synchronized (this) {
+      synchronized (lock) {
         if (!shuttingDown) {
           LOG.warn("!!! Caught interrupted exception but not closed so rethrowing interrupted. loggedQ:"
               + loggedQ.size() + " sendingQ:" + sendingQ.size());
@@ -757,78 +794,99 @@ public class NaiveFileWALManager impleme
         }
       }
     }
-    LOG.info("opening log file {}", sendingTag);
-    changeState(sendingTag, State.LOGGED, State.SENDING);
-    sendingCount.incrementAndGet();
-    File curFile = getFile(sendingTag);
-    EventSource curSource = new SeqfileEventSource(curFile.getAbsolutePath());
-    return new StateChangeDeco(curSource, sendingTag);
+    synchronized (lock) {
+      LOG.info("opening log file {}", sendingTag);
+      changeState(sendingTag, State.LOGGED, State.SENDING);
+      sendingCount.incrementAndGet();
+      File curFile = getFile(sendingTag);
+      EventSource curSource = new SeqfileEventSource(curFile.getAbsolutePath());
+      return new StateChangeDeco(curSource, sendingTag);
+    }
   }
 
   /**
    * Convert a tag from Sent to end-to-end acked.
    */
-  synchronized public void toAcked(String tag) throws IOException {
-    changeState(tag, State.SENT, State.E2EACKED);
-    ackedCount.incrementAndGet();
+  public void toAcked(String tag) throws IOException {
+    synchronized (lock) {
+      WALData wd = getWalData(tag);
+      if (wd.s == State.LOGGED && wd.getSentCount() > 0) {
+        changeState(tag, State.LOGGED, State.E2EACKED);
+        ackedCount.incrementAndGet();
+
+      } else if (wd.s == State.SENT) {
+        changeState(tag, State.SENT, State.E2EACKED);
+        ackedCount.incrementAndGet();
+      }
+
+    }
   }
 
   /**
    * Change something that is sent and not acknowledged to logged state so that
    * the normal mechanisms will eventually retry sending it.
    */
-  synchronized public void retry(String tag) throws IOException {
-    // Yuck. This is like a CAS right now.
-    WALData data = table.get(tag);
-    if (data == null) {
-      // wrong WALManager
-      return;
-    }
-    if (data != null) {
-      switch (data.s) {
-      case SENDING: {
-        // This is possible if a connection goes down. If we are currently
-        // sending this group, we should continue trying to send (no need to
-        // restarting it by demoting to LOGGED)
-        LOG.info("Attempt to retry chunk in SENDING state.  Data is being sent so "
-            + "there is no need for state transition.");
-        break;
-      }
-      case LOGGED: {
-        // This is likely the most common case where we retry events spooled to
-        // disk
-        LOG.info("Attempt to retry chunk in LOGGED state.  There is no need "
-            + "for state transition.");
-        break;
-      }
-      case SENT: {
-        // This is possible if the collector goes down or if endpoint (HDFS)
-        // goes down. Here we demote the chunk back to LOGGED state.
-        changeState(tag, State.SENT, State.LOGGED);
-        retryCount.incrementAndGet();
-        break;
-      }
-      case E2EACKED: {
-        // This is possible but very unlikely. If a group is in this state it is
-        // about to be deleted and thus doesn't need a state transition.
-        LOG.debug("Attemp to retry chunk in E2EACKED state. There is no "
-            + "need to retry because data is acked.");
-        break;
-      }
-
-      case ERROR: // should never happen
-        LOG.info("Attempt to retry chunk in ERROR state.  Data in ERROR "
-            + "state stays in ERROR state so no transition.");
-        break;
-
-      case IMPORT: // should never happen
-      case WRITING: // should never happen
-      default: {
-        String msg = "Attempting to retry from a state " + data.s
-            + " which is a state do not ever retry from.";
-        LOG.error(msg);
-        throw new IllegalStateException(msg);
+
+  public void retry(String tag) throws IOException {
+    synchronized (lock) {
+      // Yuck. This is like a CAS right now.
+      WALData data = table.get(tag);
+      if (data == null) {
+        // wrong WALManager
+        return;
       }
+      if (data != null) {
+        switch (data.s) {
+        case SENDING: {
+          // This is possible if a connection goes down. If we are currently
+          // sending this group, we should continue trying to send (no need to
+          // restarting it by demoting to LOGGED)
+          LOG.info("Attempt to retry chunk '" + tag
+              + "' in SENDING state.  Data is being sent so "
+              + "there is no need for state transition.");
+          break;
+        }
+        case LOGGED: {
+          // This is likely the most common case where we retry events spooled
+          // to
+          // disk
+          LOG.info("Attempt to retry chunk '" + tag
+              + "'  in LOGGED state.  There is no need "
+              + "for state transition.");
+          break;
+        }
+        case SENT: {
+          // This is possible if the collector goes down or if endpoint (HDFS)
+          // goes down. Here we demote the chunk back to LOGGED state.
+          changeState(tag, State.SENT, State.LOGGED);
+          retryCount.incrementAndGet();
+          break;
+        }
+        case E2EACKED: {
+          // This is possible but very unlikely. If a group is in this state it
+          // is
+          // about to be deleted and thus doesn't need a state transition.
+          LOG.debug("Attempt to retry chunk  '" + tag
+              + "' in E2EACKED state. There is no "
+              + "need to retry because data is acked.");
+          break;
+        }
+
+        case ERROR: // should never happen
+          LOG.info("Attempt to retry chunk '" + tag
+              + "' from ERROR state.  Data in ERROR "
+              + "state stays in ERROR state so no transition.");
+          break;
+
+        case IMPORT: // should never happen
+        case WRITING: // should never happen
+        default: {
+          String msg = "Attempt to retry from a state " + data.s
+              + " which is a state do not ever retry from.";
+          LOG.error(msg);
+          throw new IllegalStateException(msg);
+        }
+        }
       }
     }
   }
@@ -848,7 +906,7 @@ public class NaiveFileWALManager impleme
 
       // add to logging queue
       WALData data = WALData.recovered(fn);
-      synchronized (this) {
+      synchronized (lock) {
         table.put(fn, data);
         loggedQ.add(fn);
         importedCount.incrementAndGet();
@@ -862,7 +920,7 @@ public class NaiveFileWALManager impleme
   }
 
   @Override
-  synchronized public ReportEvent getMetrics() {
+  public ReportEvent getMetrics() {
     ReportEvent rpt = new ReportEvent(getName());
 
     // historical counts
@@ -893,9 +951,16 @@ public class NaiveFileWALManager impleme
   }
 
   @Override
-  synchronized public boolean isEmpty() {
-    return writingQ.isEmpty() && loggedQ.isEmpty() && sendingQ.isEmpty()
-        && sentQ.isEmpty();
+  public boolean isEmpty() {
+    synchronized (lock) {
+      return writingQ.isEmpty() && loggedQ.isEmpty() && sendingQ.isEmpty()
+          && sentQ.isEmpty();
+    }
+  }
+
+  // Exposed for testing
+  WALData getWalData(String tag) {
+    return table.get(tag);
   }
 
 }

Added: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java?rev=1159825&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java (added)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java Sat Aug 20 00:48:50 2011
@@ -0,0 +1,360 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.agent.durability;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.agent.FlumeNodeWALNotifier;
+import com.cloudera.flume.agent.durability.NaiveFileWALManager.WALData;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.flume.core.EventSource;
+import com.cloudera.flume.handlers.endtoend.AckListener;
+import com.cloudera.flume.handlers.rolling.Tagger;
+import com.cloudera.util.Clock;
+import com.cloudera.util.FileUtil;
+
+/**
+ * This test case exercises all the state transitions found when data goes
+ * through the write ahead log and comes out the other side.
+ */
+public class TestFlumeNodeWALNotifierRacy {
+  Logger LOG = LoggerFactory.getLogger(TestFlumeNodeWALNotifierRacy.class);
+
+  Date date;
+  CannedTagger tagger;
+  AckListener mockAl;
+  File tmpdir;
+  NaiveFileWALManager walman;
+  Map<String, WALManager> map;
+  EventSink snk;
+  EventSource src;
+
+  /**
+   * This issues simple incrementing integer toString as a tag for the next wal
+   * file.
+   */
+  static class CannedTagger implements Tagger {
+    int cur = 0;
+    List<String> tags = Collections.synchronizedList(new ArrayList<String>());
+
+    CannedTagger() {
+    }
+
+    @Override
+    public String getTag() {
+      return Integer.toString(cur);
+    }
+
+    @Override
+    public String newTag() {
+      cur++;
+      String tag = Integer.toString(cur);
+      tags.add(tag);
+      return tag;
+    }
+
+    @Override
+    public Date getDate() {
+      return null;
+    }
+
+    List<String> getTags() {
+      return tags;
+    }
+  }
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    date = new Date();
+
+    tagger = new CannedTagger();
+    mockAl = mock(AckListener.class);
+
+    tmpdir = FileUtil.mktempdir();
+    walman = new NaiveFileWALManager(tmpdir);
+    walman.open();
+
+    map = new HashMap<String, WALManager>();
+    map.put("wal", walman);
+  }
+
+  @After
+  public void teardown() throws IOException {
+    FileUtil.rmr(tmpdir);
+  }
+
+  /**
+   * Attempt a retry on the specified tag. It is important that there is no
+   * sychronization on this -- concurrency needs to be handled by the wal
+   * sources and walmanager code.
+   * 
+   * @param tag
+   * @throws IOException
+   */
+  public void triggerRetry(String tag) throws IOException {
+    FlumeNodeWALNotifier notif = new FlumeNodeWALNotifier(map);
+    notif.retry(tag);
+  }
+
+  /**
+   * Transition to writing state.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void toWritingState() throws IOException, InterruptedException {
+    snk = walman.newAckWritingSink(tagger, mockAl);
+    EventImpl evt = new EventImpl("foofoodata".getBytes());
+    snk.open();
+    snk.append(evt);
+  }
+
+  /**
+   * Transition from writing to logged state
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void toLoggedState() throws IOException, InterruptedException {
+    // transition to logged state.
+    snk.close();
+  }
+
+  /**
+   * Transition from logged to sending state.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void toSendingState() throws IOException, InterruptedException {
+    // transition to sending state.
+    src = walman.getUnackedSource();
+    src.open();
+    while (src.next() != null) {
+      ;
+    }
+  }
+
+  /**
+   * Transition from sending to sent state.
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  void toSentState() throws IOException, InterruptedException {
+    // transition to sent state.
+    src.close();
+  }
+
+  /**
+   * Transition from sent to acked state
+   * 
+   * @param tag
+   * @throws IOException
+   */
+  void toAckedState(String tag) throws IOException {
+    // transition to acked state.
+    walman.toAcked(tag);
+  }
+
+  /**
+   * Make one state transition based on the current state of the wal tag. The
+   * key point here is that even though there is no locking here, transition
+   * between states can have retry's happen anywhere and data still only does
+   * proper state transitions. function is executed.
+   * 
+   * @param tag
+   * @return isDone (either in E2EACKED state, or no longer present)f
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  boolean step(String tag) throws IOException, InterruptedException {
+    WALData wd = walman.getWalData(tag);
+    if (wd == null) {
+      return true;
+    }
+    switch (wd.s) {
+    case WRITING:
+      LOG.info("LOGGED  tag '" + tag + "'");
+      toLoggedState();
+      return false;
+    case LOGGED:
+      LOG.info("SENDING tag '" + tag + "'");
+      toSendingState();
+      return false;
+    case SENDING:
+      LOG.info("SENT    tag '" + tag + "'");
+      toSentState();
+      return false;
+    case SENT:
+      LOG.info("ACKED   tag '" + tag + "'");
+      toAckedState(tag);
+      return false;
+    case E2EACKED:
+      return true;
+    default:
+      throw new IllegalStateException("Unexpected state " + wd.s);
+    }
+  }
+
+  /**
+   * Run the test for 10000 wal files. On laptop this normally finishes in 20s,
+   * so timeout after 100s (100000ms).
+   * 
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test(timeout = 100000)
+  public void testRetryWriting() throws IOException, InterruptedException {
+    final int count = 10000;
+    retryWritingRacynessRun(count);
+  }
+
+  /**
+   * The test manages threads -- on thread to marching through states and
+   * another that introduces many many retry attempts.
+   * 
+   * @param count
+   * @throws InterruptedException
+   */
+  void retryWritingRacynessRun(final int count) throws InterruptedException {
+    // one for each thread
+    final CountDownLatch done = new CountDownLatch(2);
+    // start second thread
+    final CountDownLatch canRetry = new CountDownLatch(1);
+
+    // Thread 1 is creating new log data
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          for (int i = 0; i < count; i++) {
+            // there is an off by one thing here
+            String tag = Integer.toString(i + 1);
+            LOG.info("WRITING tag '" + tag + "'");
+            toWritingState();
+            canRetry.countDown();
+            // tag = tagger.getTags().get(i);
+            // first time will allow retry thread to start, otherwise do nothing
+            while (!step(tag)) {
+              LOG.info("Stepping on tag " + tag);
+            }
+
+          }
+        } catch (Exception e) {
+          LOG.error("This wasn't supposed to happen", e);
+        } finally {
+          done.countDown();
+        }
+      }
+    };
+    t.start();
+
+    // thread 2 is periodically triggering retries.
+    Thread t2 = new Thread() {
+      public void run() {
+        int count = 0;
+        try {
+          canRetry.await();
+          while (done.getCount() > 1) {
+            List<String> tags = tagger.getTags();
+            Clock.sleep(10);
+
+            // Key point -- read of the tag and the retry don't clash with other
+            // threads state transition
+            // synchronized (lock) {
+            int sz = tags.size();
+            for (int i = 0; i < sz; i++) {
+              String tag = tags.get(i);
+              WALData wd = walman.getWalData(tag);
+              if (!walman.getWritingTags().contains(tag) && wd != null) {
+                // any state but writing
+                triggerRetry(tag);
+                LOG.info("Forcing retry on tag '" + tag + "'");
+                count++;
+              }
+              // }
+
+            }
+          }
+        } catch (Exception e) {
+          LOG.error("This wasn't supposed to happen either", e);
+        } finally {
+          done.countDown();
+          LOG.info("Issued {} retries", count);
+        }
+      }
+    };
+    t2.start();
+
+    done.await();
+
+    assertEquals(0, walman.getWritingTags().size());
+    assertEquals(0, walman.getLoggedTags().size());
+    assertEquals(0, walman.getSendingTags().size());
+    assertEquals(0, walman.getSentTags().size());
+  }
+
+  /**
+   * You can run this test from the command line for an extended racyness
+   * testing.
+   */
+  public static void main(String[] argv) {
+    if (argv.length != 1) {
+      System.err.println("usage: "
+          + TestFlumeNodeWALNotifierRacy.class.getSimpleName() + " <n>");
+      System.err
+          .println("  Run wal racyness test sending <n> events.  This rolls "
+              + "a new log file after every event and also attempts to "
+              + "a retry every 10ms.");
+      System.exit(1);
+    }
+    int count = Integer.parseInt(argv[0]);
+
+    TestFlumeNodeWALNotifierRacy test = new TestFlumeNodeWALNotifierRacy();
+    try {
+      test.setup();
+      test.retryWritingRacynessRun(count);
+      test.teardown();
+    } catch (Exception e) {
+      System.err.println("Test failed");
+      e.printStackTrace();
+      System.exit(2);
+    }
+    // success!
+  }
+}