You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by jm...@apache.org on 2011/08/20 02:48:50 UTC
svn commit: r1159825 - in /incubator/flume/trunk/flume-core/src:
main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
Author: jmhsieh
Date: Sat Aug 20 00:48:50 2011
New Revision: 1159825
URL: http://svn.apache.org/viewvc?rev=1159825&view=rev
Log:
FLUME-745: Race condition in NaiveFileWALDeco and retransmit logic
- Setup test to run for a long time exacerbating potential race every 10ms.
- Made test runnable from command line for arbitrary iterations
- Eliminated possible memory leak by remove WALdata entry after completing e2eacked
- NaiveFileWALDeco to use object lock
Added:
incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
Modified:
incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
Modified: incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java?rev=1159825&r1=1159824&r2=1159825&view=diff
==============================================================================
--- incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java (original)
+++ incubator/flume/trunk/flume-core/src/main/java/com/cloudera/flume/agent/durability/NaiveFileWALManager.java Sat Aug 20 00:48:50 2011
@@ -101,18 +101,33 @@ public class NaiveFileWALManager impleme
private volatile boolean shuttingDown = false;
+ private Object lock = new Object();
+
/**
* Simple record for keeping the state of tag.
*/
static class WALData {
State s;
String tag;
+ int sentCount = 0;
WALData(String tag) {
this.s = State.WRITING;
this.tag = tag;
}
+ void incrementSentCount() {
+ sentCount++;
+ }
+
+ int getSentCount() {
+ return sentCount;
+ }
+
+ public String toString() {
+ return "WALData { " + tag + " state:" + s + " sent:" + sentCount + "}";
+ }
+
static WALData recovered(String tag) {
WALData data = new WALData(tag);
data.s = State.LOGGED;
@@ -148,35 +163,37 @@ public class NaiveFileWALManager impleme
this.baseDir = baseDir;
}
- synchronized public void open() throws IOException {
- // make the dirs if they do not exist
- if (!FileUtil.makeDirs(importDir)) {
- throw new IOException("Unable to create import dir: " + importDir);
- }
- if (!FileUtil.makeDirs(writingDir)) {
- throw new IOException("Unable to create writing dir: " + writingDir);
- }
- if (!FileUtil.makeDirs(loggedDir)) {
- throw new IOException("Unable to create logged dir: " + loggedDir);
- }
- if (!FileUtil.makeDirs(sendingDir)) {
- throw new IOException("Unable to create sending dir: " + sendingDir);
- }
- if (!FileUtil.makeDirs(sentDir)) {
- throw new IOException("Unable to create import dir: " + sentDir);
- }
- if (!FileUtil.makeDirs(doneDir)) {
- throw new IOException("Unable to create writing dir: " + doneDir);
- }
- if (!FileUtil.makeDirs(errorDir)) {
- throw new IOException("Unable to create logged dir: " + errorDir);
- }
+ public void open() throws IOException {
+ synchronized (lock) {
+ // make the dirs if they do not exist
+ if (!FileUtil.makeDirs(importDir)) {
+ throw new IOException("Unable to create import dir: " + importDir);
+ }
+ if (!FileUtil.makeDirs(writingDir)) {
+ throw new IOException("Unable to create writing dir: " + writingDir);
+ }
+ if (!FileUtil.makeDirs(loggedDir)) {
+ throw new IOException("Unable to create logged dir: " + loggedDir);
+ }
+ if (!FileUtil.makeDirs(sendingDir)) {
+ throw new IOException("Unable to create sending dir: " + sendingDir);
+ }
+ if (!FileUtil.makeDirs(sentDir)) {
+ throw new IOException("Unable to create import dir: " + sentDir);
+ }
+ if (!FileUtil.makeDirs(doneDir)) {
+ throw new IOException("Unable to create writing dir: " + doneDir);
+ }
+ if (!FileUtil.makeDirs(errorDir)) {
+ throw new IOException("Unable to create logged dir: " + errorDir);
+ }
- if (shuttingDown) {
- LOG.warn("Strange, shutting down but now reopening");
+ if (shuttingDown) {
+ LOG.warn("Strange, shutting down but now reopening");
+ }
+ shuttingDown = false;
+ LOG.info("NaiveFileWALManager is now open");
}
- shuttingDown = false;
- LOG.info("NaiveFileWALManager is now open");
}
public Collection<String> getWritingTags() {
@@ -200,12 +217,14 @@ public class NaiveFileWALManager impleme
*
* This is not a blocking close.
*/
- synchronized public void stopDrains() throws IOException {
- if (shuttingDown) {
- LOG.warn("Already shutting down, but getting another shutting down notice, odd");
+ public void stopDrains() throws IOException {
+ synchronized (lock) {
+ if (shuttingDown) {
+ LOG.warn("Already shutting down, but getting another shutting down notice, odd");
+ }
+ shuttingDown = true;
+ LOG.info("NaiveFileWALManager shutting down");
}
- shuttingDown = true;
- LOG.info("NaiveFileWALManager shutting down");
}
/**
@@ -393,121 +412,126 @@ public class NaiveFileWALManager impleme
* restart from there. Optimizations can recover at finer grain and be more
* performant.
*/
- synchronized public void recover() throws IOException {
+ public void recover() throws IOException {
+ synchronized (lock) {
+ // move all writing into the logged dir.
+ for (String f : writingDir.list()) {
+ try {
+ recoverLog(writingDir, f);
+ } catch (InterruptedException e) {
+ LOG.error("Interupted when trying to recover WAL log {}", f, e);
+ throw new IOException("Unable to recover " + writingDir + f);
+ }
+ }
- // move all writing into the logged dir.
- for (String f : writingDir.list()) {
- try {
- recoverLog(writingDir, f);
- } catch (InterruptedException e) {
- LOG.error("Interupted when trying to recover WAL log {}", f, e);
- throw new IOException("Unable to recover " + writingDir + f);
+ // move all sending into the logged dir
+ for (String f : sendingDir.list()) {
+ try {
+ recoverLog(sendingDir, f);
+ } catch (InterruptedException e) {
+ LOG.error("Interupted when trying to recover WAL log {}", f, e);
+ throw new IOException("Unable to recover " + sendingDir + f);
+ }
}
- }
- // move all sending into the logged dir
- for (String f : sendingDir.list()) {
- try {
- recoverLog(sendingDir, f);
- } catch (InterruptedException e) {
- LOG.error("Interupted when trying to recover WAL log {}", f, e);
- throw new IOException("Unable to recover " + sendingDir + f);
+ // move all sent into the logged dir.
+ for (String f : sentDir.list()) {
+ try {
+ recoverLog(sentDir, f);
+ } catch (InterruptedException e) {
+ LOG.error("Interupted when trying to recover WAL log {}", f, e);
+ throw new IOException("Unable to recover " + sentDir + f);
+ }
}
- }
- // move all sent into the logged dir.
- for (String f : sentDir.list()) {
- try {
- recoverLog(sentDir, f);
- } catch (InterruptedException e) {
- LOG.error("Interupted when trying to recover WAL log {}", f, e);
- throw new IOException("Unable to recover " + sentDir + f);
+ // add all logged to loggedQ and table
+ for (String f : loggedDir.list()) {
+ // File log = new File(loggedDir, f);
+ WALData data = WALData.recovered(f);
+ table.put(f, data);
+ loggedQ.add(f);
+ recoverCount.incrementAndGet();
+ LOG.debug("Recover loaded {}", f);
}
- }
- // add all logged to loggedQ and table
- for (String f : loggedDir.list()) {
- // File log = new File(loggedDir, f);
- WALData data = WALData.recovered(f);
- table.put(f, data);
- loggedQ.add(f);
- recoverCount.incrementAndGet();
- LOG.debug("Recover loaded {}", f);
+ // carry on now on your merry way.
}
-
- // carry on now on your merry way.
}
/**
* This gets a new sink when rolling, and is called when rolling to a new
* file.
*/
- synchronized public EventSink newAckWritingSink(final Tagger tagger,
- AckListener al) throws IOException {
- File dir = getDir(State.WRITING);
- final String tag = tagger.newTag();
-
- EventSink bareSink = new SeqfileEventSink(
- new File(dir, tag).getAbsoluteFile());
- EventSink curSink = new AckChecksumInjector<EventSink>(bareSink,
- tag.getBytes(), al);
-
- writingQ.add(tag);
- WALData data = new WALData(tag);
- table.put(tag, data);
- writingCount.incrementAndGet();
+ public EventSink newAckWritingSink(final Tagger tagger, AckListener al)
+ throws IOException {
+ synchronized (lock) {
+ File dir = getDir(State.WRITING);
+ final String tag = tagger.newTag();
+
+ EventSink bareSink = new SeqfileEventSink(
+ new File(dir, tag).getAbsoluteFile());
+ EventSink curSink = new AckChecksumInjector<EventSink>(bareSink,
+ tag.getBytes(), al);
- return new EventSinkDecorator<EventSink>(curSink) {
- @Override
- public void append(Event e) throws IOException, InterruptedException {
- getSink().append(e);
- }
+ writingQ.add(tag);
+ WALData data = new WALData(tag);
+ table.put(tag, data);
+ writingCount.incrementAndGet();
- @Override
- public void close() throws IOException, InterruptedException {
- super.close();
- synchronized (NaiveFileWALManager.this) {
- if (!writingQ.contains(tag)) {
- LOG.warn("Already changed tag {} out of WRITING state", tag);
- return;
- }
- LOG.info("File lives in {}", getFile(tag));
+ return new EventSinkDecorator<EventSink>(curSink) {
+ @Override
+ public void append(Event e) throws IOException, InterruptedException {
+ getSink().append(e);
+ }
- changeState(tag, State.WRITING, State.LOGGED);
- loggedCount.incrementAndGet();
+ @Override
+ public void close() throws IOException, InterruptedException {
+ super.close();
+ synchronized (lock) {
+ if (!writingQ.contains(tag)) {
+ LOG.warn("Already changed tag {} out of WRITING state", tag);
+ return;
+ }
+ LOG.info("File lives in {}", getFile(tag));
+
+ changeState(tag, State.WRITING, State.LOGGED);
+ loggedCount.incrementAndGet();
+ }
}
- }
- };
+ };
+ }
}
/**
* Returns a new sink when the roller asks for a new one.
*/
- synchronized public EventSink newWritingSink(final Tagger tagger)
- throws IOException {
- File dir = getDir(State.WRITING);
- final String tag = tagger.newTag();
- EventSink curSink = new SeqfileEventSink(
- new File(dir, tag).getAbsoluteFile());
- writingQ.add(tag);
- WALData data = new WALData(tag);
- table.put(tag, data);
-
- return new EventSinkDecorator<EventSink>(curSink) {
- @Override
- public void append(Event e) throws IOException, InterruptedException {
- LOG.debug("Appending event: {}", e); // performance sensitive
- getSink().append(e);
+ public EventSink newWritingSink(final Tagger tagger) throws IOException {
+ synchronized (lock) {
+ File dir = getDir(State.WRITING);
+ final String tag = tagger.newTag();
+ EventSink curSink = new SeqfileEventSink(
+ new File(dir, tag).getAbsoluteFile());
+ writingQ.add(tag);
+ WALData data = new WALData(tag);
+ table.put(tag, data);
- }
+ return new EventSinkDecorator<EventSink>(curSink) {
+ @Override
+ public void append(Event e) throws IOException, InterruptedException {
+ LOG.debug("Appending event: {}", e); // performance sensitive
+ getSink().append(e);
- @Override
- public void close() throws IOException, InterruptedException {
- super.close();
- changeState(tag, State.WRITING, State.LOGGED);
+ }
- }
- };
+ @Override
+ public void close() throws IOException, InterruptedException {
+ synchronized (lock) {
+ super.close();
+ changeState(tag, State.WRITING, State.LOGGED);
+ }
+ }
+ };
+ }
}
@Override
@@ -563,6 +587,12 @@ public class NaiveFileWALManager impleme
}
}
+ /**
+ * This needs to be called from a lock protected call site
+ *
+ * @param tag
+ * @return
+ */
private File getFile(String tag) {
Preconditions.checkNotNull(tag, "Attempted to get file for empty tag");
WALData data = table.get(tag);
@@ -580,66 +610,73 @@ public class NaiveFileWALManager impleme
* This can throw both IOExceptions and runtime exceptions due to
* Preconditions failures.
*/
- synchronized void changeState(String tag, State oldState, State newState)
+ void changeState(String tag, State oldState, State newState)
throws IOException {
- WALData data = table.get(tag);
- Preconditions.checkArgument(data != null, "Tag " + tag + " has no data");
- Preconditions.checkArgument(tag.equals(data.tag),
- "Data associated with tag didn't match tag " + tag);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Change " + data.s + "/" + oldState + " to " + newState + " : "
- + tag);
- }
-
- // null allows any previous state.
- if (oldState == null) {
- oldState = data.s;
- }
-
- Preconditions.checkState(data.s == oldState, "Expected state to be "
- + oldState + " but was " + data.s);
-
- if (oldState == State.ERROR) {
- throw new IllegalStateException("Cannot move from error state");
- }
-
- /*
- * This uses java's File.rename and File.delete method. According to the
- * link below, Solaris (I assume POSIX/linux) does atomic rename but Windows
- * does not guarantee it.
- * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4017593 To be truly
- * correct, I need to check the return value (will likely fail in unix if
- * moving from one volume to another instead of just within same volume)
- */
-
- // move files to other directories to making state change durable.
- File orig = getFile(tag);
- File newf = new File(getDir(newState), tag);
- boolean success = orig.renameTo(newf);
- if (!success) {
- throw new IOException("Move " + orig + " -> " + newf + "failed!");
- }
-
- // E2EACKED is terminal state just delete it.
- // TODO (jon) add option to keep logged files
- if (newState == State.E2EACKED) {
- LOG.debug("Deleting WAL file: {}", newf.getAbsoluteFile());
- boolean res = newf.delete();
- if (!res) {
- LOG.warn("Failed to delete complete WAL file: {}",
- newf.getAbsoluteFile());
+ synchronized (lock) {
+ WALData data = table.get(tag);
+ Preconditions.checkArgument(data != null, "Tag " + tag + " has no data");
+ Preconditions.checkArgument(tag.equals(data.tag),
+ "Data associated with tag didn't match tag " + tag);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Change " + data.s + "/" + oldState + " to " + newState
+ + " : " + tag);
+ }
+
+ // null allows any previous state.
+ if (oldState == null) {
+ oldState = data.s;
+ }
+
+ if (oldState == State.SENT && newState == State.LOGGED) {
+ data.incrementSentCount();
+ }
+
+ Preconditions.checkState(data.s == oldState, "Expected state to be "
+ + oldState + " but was " + data.s);
+
+ if (oldState == State.ERROR) {
+ throw new IllegalStateException("Cannot move from error state");
+ }
+
+ /*
+ * This uses java's File.rename and File.delete method. According to the
+ * link below, Solaris (I assume POSIX/linux) does atomic rename but
+ * Windows does not guarantee it.
+ * http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4017593 To be truly
+ * correct, I need to check the return value (will likely fail in unix if
+ * moving from one volume to another instead of just within same volume)
+ */
+
+ // move files to other directories to making state change durable.
+ File orig = getFile(tag);
+ File newf = new File(getDir(newState), tag);
+ boolean success = orig.renameTo(newf);
+ if (!success) {
+ throw new IOException("Move " + orig + " -> " + newf + "failed!");
+ }
+
+ // E2EACKED is terminal state just delete it.
+ // TODO (jon) add option to keep logged files
+ if (newState == State.E2EACKED) {
+ LOG.debug("Deleting WAL file: {}", newf.getAbsoluteFile());
+ boolean res = newf.delete();
+ if (!res) {
+ LOG.warn("Failed to delete complete WAL file: {}",
+ newf.getAbsoluteFile());
+ }
+ table.remove(tag);
}
- }
- // is successful, update queues.
- LOG.debug("old state is {}", oldState);
- getQueue(oldState).remove(tag);
- BlockingQueue<String> q = getQueue(newState);
- if (q != null) {
- q.add(tag);
+ // is successful, update queues.
+ LOG.debug("old state is {}", oldState);
+ getQueue(oldState).remove(tag);
+ BlockingQueue<String> q = getQueue(newState);
+ if (q != null) {
+ q.add(tag);
+ }
+ data.s = newState;
}
- data.s = newState;
}
/**
@@ -731,7 +768,7 @@ public class NaiveFileWALManager impleme
// exit condition is when closed is flagged and the queue is empty.
if (sendingTag == null) {
- synchronized (this) {
+ synchronized (lock) {
if (shuttingDown && loggedQ.isEmpty() && sendingQ.isEmpty())
return null;
}
@@ -739,7 +776,7 @@ public class NaiveFileWALManager impleme
}
} catch (InterruptedException e) {
LOG.error("interrupted", e);
- synchronized (this) {
+ synchronized (lock) {
if (!shuttingDown) {
LOG.warn("!!! Caught interrupted exception but not closed so rethrowing interrupted. loggedQ:"
+ loggedQ.size() + " sendingQ:" + sendingQ.size());
@@ -757,78 +794,99 @@ public class NaiveFileWALManager impleme
}
}
}
- LOG.info("opening log file {}", sendingTag);
- changeState(sendingTag, State.LOGGED, State.SENDING);
- sendingCount.incrementAndGet();
- File curFile = getFile(sendingTag);
- EventSource curSource = new SeqfileEventSource(curFile.getAbsolutePath());
- return new StateChangeDeco(curSource, sendingTag);
+ synchronized (lock) {
+ LOG.info("opening log file {}", sendingTag);
+ changeState(sendingTag, State.LOGGED, State.SENDING);
+ sendingCount.incrementAndGet();
+ File curFile = getFile(sendingTag);
+ EventSource curSource = new SeqfileEventSource(curFile.getAbsolutePath());
+ return new StateChangeDeco(curSource, sendingTag);
+ }
}
/**
* Convert a tag from Sent to end-to-end acked.
*/
- synchronized public void toAcked(String tag) throws IOException {
- changeState(tag, State.SENT, State.E2EACKED);
- ackedCount.incrementAndGet();
+ public void toAcked(String tag) throws IOException {
+ synchronized (lock) {
+ WALData wd = getWalData(tag);
+ if (wd.s == State.LOGGED && wd.getSentCount() > 0) {
+ changeState(tag, State.LOGGED, State.E2EACKED);
+ ackedCount.incrementAndGet();
+
+ } else if (wd.s == State.SENT) {
+ changeState(tag, State.SENT, State.E2EACKED);
+ ackedCount.incrementAndGet();
+ }
+
+ }
}
/**
* Change something that is sent and not acknowledged to logged state so that
* the normal mechanisms will eventually retry sending it.
*/
- synchronized public void retry(String tag) throws IOException {
- // Yuck. This is like a CAS right now.
- WALData data = table.get(tag);
- if (data == null) {
- // wrong WALManager
- return;
- }
- if (data != null) {
- switch (data.s) {
- case SENDING: {
- // This is possible if a connection goes down. If we are currently
- // sending this group, we should continue trying to send (no need to
- // restarting it by demoting to LOGGED)
- LOG.info("Attempt to retry chunk in SENDING state. Data is being sent so "
- + "there is no need for state transition.");
- break;
- }
- case LOGGED: {
- // This is likely the most common case where we retry events spooled to
- // disk
- LOG.info("Attempt to retry chunk in LOGGED state. There is no need "
- + "for state transition.");
- break;
- }
- case SENT: {
- // This is possible if the collector goes down or if endpoint (HDFS)
- // goes down. Here we demote the chunk back to LOGGED state.
- changeState(tag, State.SENT, State.LOGGED);
- retryCount.incrementAndGet();
- break;
- }
- case E2EACKED: {
- // This is possible but very unlikely. If a group is in this state it is
- // about to be deleted and thus doesn't need a state transition.
- LOG.debug("Attemp to retry chunk in E2EACKED state. There is no "
- + "need to retry because data is acked.");
- break;
- }
-
- case ERROR: // should never happen
- LOG.info("Attempt to retry chunk in ERROR state. Data in ERROR "
- + "state stays in ERROR state so no transition.");
- break;
-
- case IMPORT: // should never happen
- case WRITING: // should never happen
- default: {
- String msg = "Attempting to retry from a state " + data.s
- + " which is a state do not ever retry from.";
- LOG.error(msg);
- throw new IllegalStateException(msg);
+
+ public void retry(String tag) throws IOException {
+ synchronized (lock) {
+ // Yuck. This is like a CAS right now.
+ WALData data = table.get(tag);
+ if (data == null) {
+ // wrong WALManager
+ return;
}
+ if (data != null) {
+ switch (data.s) {
+ case SENDING: {
+ // This is possible if a connection goes down. If we are currently
+ // sending this group, we should continue trying to send (no need to
+ // restarting it by demoting to LOGGED)
+ LOG.info("Attempt to retry chunk '" + tag
+ + "' in SENDING state. Data is being sent so "
+ + "there is no need for state transition.");
+ break;
+ }
+ case LOGGED: {
+ // This is likely the most common case where we retry events spooled
+ // to
+ // disk
+ LOG.info("Attempt to retry chunk '" + tag
+ + "' in LOGGED state. There is no need "
+ + "for state transition.");
+ break;
+ }
+ case SENT: {
+ // This is possible if the collector goes down or if endpoint (HDFS)
+ // goes down. Here we demote the chunk back to LOGGED state.
+ changeState(tag, State.SENT, State.LOGGED);
+ retryCount.incrementAndGet();
+ break;
+ }
+ case E2EACKED: {
+ // This is possible but very unlikely. If a group is in this state it
+ // is
+ // about to be deleted and thus doesn't need a state transition.
+ LOG.debug("Attempt to retry chunk '" + tag
+ + "' in E2EACKED state. There is no "
+ + "need to retry because data is acked.");
+ break;
+ }
+
+ case ERROR: // should never happen
+ LOG.info("Attempt to retry chunk '" + tag
+ + "' from ERROR state. Data in ERROR "
+ + "state stays in ERROR state so no transition.");
+ break;
+
+ case IMPORT: // should never happen
+ case WRITING: // should never happen
+ default: {
+ String msg = "Attempt to retry from a state " + data.s
+ + " which is a state do not ever retry from.";
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
}
}
}
@@ -848,7 +906,7 @@ public class NaiveFileWALManager impleme
// add to logging queue
WALData data = WALData.recovered(fn);
- synchronized (this) {
+ synchronized (lock) {
table.put(fn, data);
loggedQ.add(fn);
importedCount.incrementAndGet();
@@ -862,7 +920,7 @@ public class NaiveFileWALManager impleme
}
@Override
- synchronized public ReportEvent getMetrics() {
+ public ReportEvent getMetrics() {
ReportEvent rpt = new ReportEvent(getName());
// historical counts
@@ -893,9 +951,16 @@ public class NaiveFileWALManager impleme
}
@Override
- synchronized public boolean isEmpty() {
- return writingQ.isEmpty() && loggedQ.isEmpty() && sendingQ.isEmpty()
- && sentQ.isEmpty();
+ public boolean isEmpty() {
+ synchronized (lock) {
+ return writingQ.isEmpty() && loggedQ.isEmpty() && sendingQ.isEmpty()
+ && sentQ.isEmpty();
+ }
+ }
+
+ // Exposed for testing
+ WALData getWalData(String tag) {
+ return table.get(tag);
}
}
Added: incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java
URL: http://svn.apache.org/viewvc/incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java?rev=1159825&view=auto
==============================================================================
--- incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java (added)
+++ incubator/flume/trunk/flume-core/src/test/java/com/cloudera/flume/agent/durability/TestFlumeNodeWALNotifierRacy.java Sat Aug 20 00:48:50 2011
@@ -0,0 +1,360 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.cloudera.flume.agent.durability;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.cloudera.flume.agent.FlumeNodeWALNotifier;
+import com.cloudera.flume.agent.durability.NaiveFileWALManager.WALData;
+import com.cloudera.flume.core.EventImpl;
+import com.cloudera.flume.core.EventSink;
+import com.cloudera.flume.core.EventSource;
+import com.cloudera.flume.handlers.endtoend.AckListener;
+import com.cloudera.flume.handlers.rolling.Tagger;
+import com.cloudera.util.Clock;
+import com.cloudera.util.FileUtil;
+
+/**
+ * This test case exercises all the state transitions found when data goes
+ * through the write ahead log and comes out the other side.
+ */
+public class TestFlumeNodeWALNotifierRacy {
+ Logger LOG = LoggerFactory.getLogger(TestFlumeNodeWALNotifierRacy.class);
+
+ Date date;
+ CannedTagger tagger;
+ AckListener mockAl;
+ File tmpdir;
+ NaiveFileWALManager walman;
+ Map<String, WALManager> map;
+ EventSink snk;
+ EventSource src;
+
+ /**
+ * This issues simple incrementing integer toString as a tag for the next wal
+ * file.
+ */
+ static class CannedTagger implements Tagger {
+ int cur = 0;
+ List<String> tags = Collections.synchronizedList(new ArrayList<String>());
+
+ CannedTagger() {
+ }
+
+ @Override
+ public String getTag() {
+ return Integer.toString(cur);
+ }
+
+ @Override
+ public String newTag() {
+ cur++;
+ String tag = Integer.toString(cur);
+ tags.add(tag);
+ return tag;
+ }
+
+ @Override
+ public Date getDate() {
+ return null;
+ }
+
+ List<String> getTags() {
+ return tags;
+ }
+ }
+
+ @Before
+ public void setup() throws IOException, InterruptedException {
+ date = new Date();
+
+ tagger = new CannedTagger();
+ mockAl = mock(AckListener.class);
+
+ tmpdir = FileUtil.mktempdir();
+ walman = new NaiveFileWALManager(tmpdir);
+ walman.open();
+
+ map = new HashMap<String, WALManager>();
+ map.put("wal", walman);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ FileUtil.rmr(tmpdir);
+ }
+
+ /**
+ * Attempt a retry on the specified tag. It is important that there is no
+ * sychronization on this -- concurrency needs to be handled by the wal
+ * sources and walmanager code.
+ *
+ * @param tag
+ * @throws IOException
+ */
+ public void triggerRetry(String tag) throws IOException {
+ FlumeNodeWALNotifier notif = new FlumeNodeWALNotifier(map);
+ notif.retry(tag);
+ }
+
+ /**
+ * Transition to writing state.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void toWritingState() throws IOException, InterruptedException {
+ snk = walman.newAckWritingSink(tagger, mockAl);
+ EventImpl evt = new EventImpl("foofoodata".getBytes());
+ snk.open();
+ snk.append(evt);
+ }
+
+ /**
+ * Transition from writing to logged state
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void toLoggedState() throws IOException, InterruptedException {
+ // transition to logged state.
+ snk.close();
+ }
+
+ /**
+ * Transition from logged to sending state.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void toSendingState() throws IOException, InterruptedException {
+ // transition to sending state.
+ src = walman.getUnackedSource();
+ src.open();
+ while (src.next() != null) {
+ ;
+ }
+ }
+
+ /**
+ * Transition from sending to sent state.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ void toSentState() throws IOException, InterruptedException {
+ // transition to sent state.
+ src.close();
+ }
+
+ /**
+ * Transition from sent to acked state
+ *
+ * @param tag
+ * @throws IOException
+ */
+ void toAckedState(String tag) throws IOException {
+ // transition to acked state.
+ walman.toAcked(tag);
+ }
+
+ /**
+ * Make one state transition based on the current state of the wal tag. The
+ * key point here is that even though there is no locking here, transition
+ * between states can have retry's happen anywhere and data still only does
+ * proper state transitions. function is executed.
+ *
+ * @param tag
+ * @return isDone (either in E2EACKED state, or no longer present)f
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ boolean step(String tag) throws IOException, InterruptedException {
+ WALData wd = walman.getWalData(tag);
+ if (wd == null) {
+ return true;
+ }
+ switch (wd.s) {
+ case WRITING:
+ LOG.info("LOGGED tag '" + tag + "'");
+ toLoggedState();
+ return false;
+ case LOGGED:
+ LOG.info("SENDING tag '" + tag + "'");
+ toSendingState();
+ return false;
+ case SENDING:
+ LOG.info("SENT tag '" + tag + "'");
+ toSentState();
+ return false;
+ case SENT:
+ LOG.info("ACKED tag '" + tag + "'");
+ toAckedState(tag);
+ return false;
+ case E2EACKED:
+ return true;
+ default:
+ throw new IllegalStateException("Unexpected state " + wd.s);
+ }
+ }
+
+ /**
+ * Run the test for 10000 wal files. On laptop this normally finishes in 20s,
+ * so timeout after 100s (100000ms).
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @Test(timeout = 100000)
+ public void testRetryWriting() throws IOException, InterruptedException {
+ final int count = 10000;
+ retryWritingRacynessRun(count);
+ }
+
+ /**
+ * The test manages threads -- on thread to marching through states and
+ * another that introduces many many retry attempts.
+ *
+ * @param count
+ * @throws InterruptedException
+ */
+ void retryWritingRacynessRun(final int count) throws InterruptedException {
+ // one for each thread
+ final CountDownLatch done = new CountDownLatch(2);
+ // start second thread
+ final CountDownLatch canRetry = new CountDownLatch(1);
+
+ // Thread 1 is creating new log data
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ for (int i = 0; i < count; i++) {
+ // there is an off by one thing here
+ String tag = Integer.toString(i + 1);
+ LOG.info("WRITING tag '" + tag + "'");
+ toWritingState();
+ canRetry.countDown();
+ // tag = tagger.getTags().get(i);
+ // first time will allow retry thread to start, otherwise do nothing
+ while (!step(tag)) {
+ LOG.info("Stepping on tag " + tag);
+ }
+
+ }
+ } catch (Exception e) {
+ LOG.error("This wasn't supposed to happen", e);
+ } finally {
+ done.countDown();
+ }
+ }
+ };
+ t.start();
+
+ // thread 2 is periodically triggering retries.
+ Thread t2 = new Thread() {
+ public void run() {
+ int count = 0;
+ try {
+ canRetry.await();
+ while (done.getCount() > 1) {
+ List<String> tags = tagger.getTags();
+ Clock.sleep(10);
+
+ // Key point -- read of the tag and the retry don't clash with other
+ // threads state transition
+ // synchronized (lock) {
+ int sz = tags.size();
+ for (int i = 0; i < sz; i++) {
+ String tag = tags.get(i);
+ WALData wd = walman.getWalData(tag);
+ if (!walman.getWritingTags().contains(tag) && wd != null) {
+ // any state but writing
+ triggerRetry(tag);
+ LOG.info("Forcing retry on tag '" + tag + "'");
+ count++;
+ }
+ // }
+
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("This wasn't supposed to happen either", e);
+ } finally {
+ done.countDown();
+ LOG.info("Issued {} retries", count);
+ }
+ }
+ };
+ t2.start();
+
+ done.await();
+
+ assertEquals(0, walman.getWritingTags().size());
+ assertEquals(0, walman.getLoggedTags().size());
+ assertEquals(0, walman.getSendingTags().size());
+ assertEquals(0, walman.getSentTags().size());
+ }
+
+ /**
+ * You can run this test from the command line for an extended racyness
+ * testing.
+ */
+ public static void main(String[] argv) {
+ if (argv.length != 1) {
+ System.err.println("usage: "
+ + TestFlumeNodeWALNotifierRacy.class.getSimpleName() + " <n>");
+ System.err
+ .println(" Run wal racyness test sending <n> events. This rolls "
+ + "a new log file after every event and also attempts to "
+ + "a retry every 10ms.");
+ System.exit(1);
+ }
+ int count = Integer.parseInt(argv[0]);
+
+ TestFlumeNodeWALNotifierRacy test = new TestFlumeNodeWALNotifierRacy();
+ try {
+ test.setup();
+ test.retryWritingRacynessRun(count);
+ test.teardown();
+ } catch (Exception e) {
+ System.err.println("Test failed");
+ e.printStackTrace();
+ System.exit(2);
+ }
+ // success!
+ }
+}