You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/31 11:01:55 UTC
svn commit: r1226054 - in
/incubator/flume/branches/flume-897/wal/wal-avro/src:
main/java/org/apache/wal/avro/WALIndex.java
test/java/org/apache/wal/avro/TestWALIndex.java
Author: esammer
Date: Sat Dec 31 10:01:54 2011
New Revision: 1226054
URL: http://svn.apache.org/viewvc?rev=1226054&view=rev
Log:
- Added explicit separate read / write locks for WALIndex to lessen monitor contention.
- Added a brute force deadlock test.
Modified:
incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java?rev=1226054&r1=1226053&r2=1226054&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java Sat Dec 31 10:01:54 2011
@@ -5,11 +5,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
+import com.google.common.base.Objects.ToStringHelper;
import com.google.common.io.Files;
public class WALIndex {
@@ -26,68 +28,95 @@ public class WALIndex {
private File readIndexFile;
private MappedByteBuffer readIndexBuffer;
+ private ReentrantLock readLock;
+ private ReentrantLock writeLock;
+
private String writeFile;
private long writePosition;
private String readFile;
private long readPosition;
public WALIndex() {
+ readLock = new ReentrantLock();
+ writeLock = new ReentrantLock();
}
public synchronized void open() throws FileNotFoundException, IOException {
- writeIndexFile = new File(directory, writeIndexFileName);
- readIndexFile = new File(directory, readIndexFileName);
-
- logger.info("Opening WAL index table writeFile:{}", writeIndexFile);
-
- writeIndexBuffer = Files.map(writeIndexFile,
- FileChannel.MapMode.READ_WRITE, 4 * 1024);
- readIndexBuffer = Files.map(readIndexFile, FileChannel.MapMode.READ_WRITE,
- 4 * 1024);
-
- writePosition = writeIndexBuffer.getLong();
- int writeFileNameLength = writeIndexBuffer.getInt();
- if (writeFileNameLength > 0) {
- byte[] buffer = new byte[writeFileNameLength];
- writeIndexBuffer.get(buffer);
- writeFile = new String(buffer);
+ try {
+ writeLock.lock();
+ writeIndexFile = new File(directory, writeIndexFileName);
+
+ logger.info("Opening WAL index table writeFile:{}", writeIndexFile);
+
+ writeIndexBuffer = Files.map(writeIndexFile,
+ FileChannel.MapMode.READ_WRITE, 4 * 1024);
+
+ writePosition = writeIndexBuffer.getLong();
+ int writeFileNameLength = writeIndexBuffer.getInt();
+
+ if (writeFileNameLength > 0) {
+ byte[] buffer = new byte[writeFileNameLength];
+ writeIndexBuffer.get(buffer);
+ writeFile = new String(buffer);
+ }
+ writeIndexBuffer.position(0);
+ } finally {
+ writeLock.unlock();
}
- writeIndexBuffer.position(0);
-
- readPosition = readIndexBuffer.getLong();
- int readFileNameLength = readIndexBuffer.getInt();
-
- if (readFileNameLength > 0) {
- byte[] buffer = new byte[readFileNameLength];
- readIndexBuffer.get(buffer);
- readFile = new String(buffer);
+ try {
+ readLock.lock();
+ readIndexFile = new File(directory, readIndexFileName);
+
+ readIndexBuffer = Files.map(readIndexFile,
+ FileChannel.MapMode.READ_WRITE, 4 * 1024);
+
+ readPosition = readIndexBuffer.getLong();
+ int readFileNameLength = readIndexBuffer.getInt();
+
+ if (readFileNameLength > 0) {
+ byte[] buffer = new byte[readFileNameLength];
+ readIndexBuffer.get(buffer);
+ readFile = new String(buffer);
+ }
+
+ readIndexBuffer.position(0);
+ } finally {
+ readLock.unlock();
}
- readIndexBuffer.position(0);
-
logger.debug("Loaded index:{}", this);
}
- public synchronized void updateWriteIndex(String file, long position) {
- writeIndexBuffer.putLong(position).putInt(file.length())
- .put(file.getBytes());
- writeIndexBuffer.force();
- writeIndexBuffer.position(0);
-
- this.writeFile = file;
- this.writePosition = position;
- }
-
- public synchronized void updateReadIndex(String file, long position) {
- readIndexBuffer.putLong(position).putInt(file.length())
- .put(file.getBytes());
- readIndexBuffer.force();
- readIndexBuffer.position(0);
+ public void updateWriteIndex(String file, long position) {
+ try {
+ writeLock.lock();
+ writeIndexBuffer.putLong(position).putInt(file.length())
+ .put(file.getBytes());
+ writeIndexBuffer.force();
+ writeIndexBuffer.position(0);
+
+ this.writeFile = file;
+ this.writePosition = position;
+ } finally {
+ writeLock.unlock();
+ }
+ }
- this.readFile = file;
- this.readPosition = position;
+ public void updateReadIndex(String file, long position) {
+ try {
+ readLock.lock();
+ readIndexBuffer.putLong(position).putInt(file.length())
+ .put(file.getBytes());
+ readIndexBuffer.force();
+ readIndexBuffer.position(0);
+
+ this.readFile = file;
+ this.readPosition = position;
+ } finally {
+ readLock.unlock();
+ }
}
public synchronized File getDirectory() {
@@ -99,43 +128,64 @@ public class WALIndex {
}
public String getWriteFile() {
- return writeFile;
- }
-
- public void setWriteFile(String file) {
- this.writeFile = file;
+ try {
+ writeLock.lock();
+ return writeFile;
+ } finally {
+ writeLock.unlock();
+ }
}
public long getWritePosition() {
- return writePosition;
- }
-
- public void setWritePosition(long position) {
- this.writePosition = position;
+ try {
+ writeLock.lock();
+ return writePosition;
+ } finally {
+ writeLock.unlock();
+ }
}
public String getReadFile() {
- return readFile;
- }
-
- public void setReadFile(String readFile) {
- this.readFile = readFile;
+ try {
+ readLock.lock();
+ return readFile;
+ } finally {
+ readLock.unlock();
+ }
}
public long getReadPosition() {
- return readPosition;
- }
-
- public void setReadPosition(long readPosition) {
- this.readPosition = readPosition;
+ try {
+ readLock.lock();
+ return readPosition;
+ } finally {
+ readLock.unlock();
+ }
}
@Override
- public String toString() {
- return Objects.toStringHelper(getClass()).add("writeFile", writeFile)
- .add("writePosition", writePosition).add("readFile", readFile)
- .add("readPosition", readPosition).add("directory", directory)
- .add("writeIndexFile", writeIndexFile)
- .add("readIndexFile", readIndexFile).toString();
+ public synchronized String toString() {
+ ToStringHelper stringHelper = Objects.toStringHelper(getClass()).add(
+ "directory", directory);
+
+ try {
+ writeLock.lock();
+ stringHelper.add("writeFile", writeFile)
+ .add("writePosition", writePosition)
+ .add("writeIndexFile", writeIndexFile);
+ } finally {
+ writeLock.unlock();
+ }
+
+ try {
+ readLock.lock();
+ stringHelper.add("readFile", readFile).add("readPosition", readPosition)
+ .add("readIndexFile", readIndexFile);
+ } finally {
+ readLock.unlock();
+ }
+
+ return stringHelper.toString();
}
+
}
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java?rev=1226054&r1=1226053&r2=1226054&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java Sat Dec 31 10:01:54 2011
@@ -3,17 +3,26 @@ package org.apache.wal.avro;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.common.io.Files;
public class TestWALIndex {
+ private static final Logger logger = LoggerFactory
+ .getLogger(TestWALIndex.class);
+
private File testDirectory;
private WALIndex index;
@@ -102,4 +111,40 @@ public class TestWALIndex {
Assert.assertEquals(2, index.getReadPosition());
}
+ @Test
+ public void testBruteForceDeadlockDetect() throws FileNotFoundException,
+ IOException, InterruptedException {
+
+ index.open();
+
+ ExecutorService executor = Executors.newFixedThreadPool(8);
+ final Random random = new Random();
+
+ for (int i = 0; i < 50000; i++) {
+ final long count = i;
+
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ if (count % 1000 == 0) {
+ logger.debug("count:{}", count);
+ }
+
+ if (random.nextBoolean()) {
+ index.updateReadIndex("a", count);
+ } else {
+ index.updateWriteIndex("a", count);
+ }
+ }
+ });
+ }
+
+ executor.shutdown();
+
+ while (!executor.isTerminated()) {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ }
+ }
+
}