You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/31 11:01:55 UTC

svn commit: r1226054 - in /incubator/flume/branches/flume-897/wal/wal-avro/src: main/java/org/apache/wal/avro/WALIndex.java test/java/org/apache/wal/avro/TestWALIndex.java

Author: esammer
Date: Sat Dec 31 10:01:54 2011
New Revision: 1226054

URL: http://svn.apache.org/viewvc?rev=1226054&view=rev
Log:
- Added explicit separate read / write locks for WALIndex to lessen monitor contention.
- Added a brute force deadlock test.

Modified:
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java?rev=1226054&r1=1226053&r2=1226054&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java Sat Dec 31 10:01:54 2011
@@ -5,11 +5,13 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.io.Files;
 
 public class WALIndex {
@@ -26,68 +28,95 @@ public class WALIndex {
   private File readIndexFile;
   private MappedByteBuffer readIndexBuffer;
 
+  private ReentrantLock readLock;
+  private ReentrantLock writeLock;
+
   private String writeFile;
   private long writePosition;
   private String readFile;
   private long readPosition;
 
   public WALIndex() {
+    readLock = new ReentrantLock();
+    writeLock = new ReentrantLock();
   }
 
   public synchronized void open() throws FileNotFoundException, IOException {
-    writeIndexFile = new File(directory, writeIndexFileName);
-    readIndexFile = new File(directory, readIndexFileName);
-
-    logger.info("Opening WAL index table writeFile:{}", writeIndexFile);
-
-    writeIndexBuffer = Files.map(writeIndexFile,
-        FileChannel.MapMode.READ_WRITE, 4 * 1024);
-    readIndexBuffer = Files.map(readIndexFile, FileChannel.MapMode.READ_WRITE,
-        4 * 1024);
-
-    writePosition = writeIndexBuffer.getLong();
-    int writeFileNameLength = writeIndexBuffer.getInt();
 
-    if (writeFileNameLength > 0) {
-      byte[] buffer = new byte[writeFileNameLength];
-      writeIndexBuffer.get(buffer);
-      writeFile = new String(buffer);
+    try {
+      writeLock.lock();
+      writeIndexFile = new File(directory, writeIndexFileName);
+
+      logger.info("Opening WAL index table writeFile:{}", writeIndexFile);
+
+      writeIndexBuffer = Files.map(writeIndexFile,
+          FileChannel.MapMode.READ_WRITE, 4 * 1024);
+
+      writePosition = writeIndexBuffer.getLong();
+      int writeFileNameLength = writeIndexBuffer.getInt();
+
+      if (writeFileNameLength > 0) {
+        byte[] buffer = new byte[writeFileNameLength];
+        writeIndexBuffer.get(buffer);
+        writeFile = new String(buffer);
+      }
+      writeIndexBuffer.position(0);
+    } finally {
+      writeLock.unlock();
     }
 
-    writeIndexBuffer.position(0);
-
-    readPosition = readIndexBuffer.getLong();
-    int readFileNameLength = readIndexBuffer.getInt();
-
-    if (readFileNameLength > 0) {
-      byte[] buffer = new byte[readFileNameLength];
-      readIndexBuffer.get(buffer);
-      readFile = new String(buffer);
+    try {
+      readLock.lock();
+      readIndexFile = new File(directory, readIndexFileName);
+
+      readIndexBuffer = Files.map(readIndexFile,
+          FileChannel.MapMode.READ_WRITE, 4 * 1024);
+
+      readPosition = readIndexBuffer.getLong();
+      int readFileNameLength = readIndexBuffer.getInt();
+
+      if (readFileNameLength > 0) {
+        byte[] buffer = new byte[readFileNameLength];
+        readIndexBuffer.get(buffer);
+        readFile = new String(buffer);
+      }
+
+      readIndexBuffer.position(0);
+    } finally {
+      readLock.unlock();
     }
 
-    readIndexBuffer.position(0);
-
     logger.debug("Loaded index:{}", this);
   }
 
-  public synchronized void updateWriteIndex(String file, long position) {
-    writeIndexBuffer.putLong(position).putInt(file.length())
-        .put(file.getBytes());
-    writeIndexBuffer.force();
-    writeIndexBuffer.position(0);
-
-    this.writeFile = file;
-    this.writePosition = position;
-  }
-
-  public synchronized void updateReadIndex(String file, long position) {
-    readIndexBuffer.putLong(position).putInt(file.length())
-        .put(file.getBytes());
-    readIndexBuffer.force();
-    readIndexBuffer.position(0);
+  public void updateWriteIndex(String file, long position) {
+    try {
+      writeLock.lock();
+      writeIndexBuffer.putLong(position).putInt(file.length())
+          .put(file.getBytes());
+      writeIndexBuffer.force();
+      writeIndexBuffer.position(0);
+
+      this.writeFile = file;
+      this.writePosition = position;
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
-    this.readFile = file;
-    this.readPosition = position;
+  public void updateReadIndex(String file, long position) {
+    try {
+      readLock.lock();
+      readIndexBuffer.putLong(position).putInt(file.length())
+          .put(file.getBytes());
+      readIndexBuffer.force();
+      readIndexBuffer.position(0);
+
+      this.readFile = file;
+      this.readPosition = position;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public synchronized File getDirectory() {
@@ -99,43 +128,64 @@ public class WALIndex {
   }
 
   public String getWriteFile() {
-    return writeFile;
-  }
-
-  public void setWriteFile(String file) {
-    this.writeFile = file;
+    try {
+      writeLock.lock();
+      return writeFile;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public long getWritePosition() {
-    return writePosition;
-  }
-
-  public void setWritePosition(long position) {
-    this.writePosition = position;
+    try {
+      writeLock.lock();
+      return writePosition;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
   public String getReadFile() {
-    return readFile;
-  }
-
-  public void setReadFile(String readFile) {
-    this.readFile = readFile;
+    try {
+      readLock.lock();
+      return readFile;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public long getReadPosition() {
-    return readPosition;
-  }
-
-  public void setReadPosition(long readPosition) {
-    this.readPosition = readPosition;
+    try {
+      readLock.lock();
+      return readPosition;
+    } finally {
+      readLock.unlock();
+    }
   }
 
   @Override
-  public String toString() {
-    return Objects.toStringHelper(getClass()).add("writeFile", writeFile)
-        .add("writePosition", writePosition).add("readFile", readFile)
-        .add("readPosition", readPosition).add("directory", directory)
-        .add("writeIndexFile", writeIndexFile)
-        .add("readIndexFile", readIndexFile).toString();
+  public synchronized String toString() {
+    ToStringHelper stringHelper = Objects.toStringHelper(getClass()).add(
+        "directory", directory);
+
+    try {
+      writeLock.lock();
+      stringHelper.add("writeFile", writeFile)
+          .add("writePosition", writePosition)
+          .add("writeIndexFile", writeIndexFile);
+    } finally {
+      writeLock.unlock();
+    }
+
+    try {
+      readLock.lock();
+      stringHelper.add("readFile", readFile).add("readPosition", readPosition)
+          .add("readIndexFile", readIndexFile);
+    } finally {
+      readLock.unlock();
+    }
+
+    return stringHelper.toString();
   }
+
 }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java?rev=1226054&r1=1226053&r2=1226054&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java Sat Dec 31 10:01:54 2011
@@ -3,17 +3,26 @@ package org.apache.wal.avro;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.io.Files;
 
 public class TestWALIndex {
 
+  private static final Logger logger = LoggerFactory
+      .getLogger(TestWALIndex.class);
+
   private File testDirectory;
   private WALIndex index;
 
@@ -102,4 +111,40 @@ public class TestWALIndex {
     Assert.assertEquals(2, index.getReadPosition());
   }
 
+  @Test
+  public void testBruteForceDeadlockDetect() throws FileNotFoundException,
+      IOException, InterruptedException {
+
+    index.open();
+
+    ExecutorService executor = Executors.newFixedThreadPool(8);
+    final Random random = new Random();
+
+    for (int i = 0; i < 50000; i++) {
+      final long count = i;
+
+      executor.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          if (count % 1000 == 0) {
+            logger.debug("count:{}", count);
+          }
+
+          if (random.nextBoolean()) {
+            index.updateReadIndex("a", count);
+          } else {
+            index.updateWriteIndex("a", count);
+          }
+        }
+      });
+    }
+
+    executor.shutdown();
+
+    while (!executor.isTerminated()) {
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+    }
+  }
+
 }