You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/28 08:30:41 UTC

svn commit: r1225130 - in /incubator/flume/branches/flume-897/wal: wal-avro/src/main/java/org/apache/wal/avro/ wal-avro/src/test/java/org/apache/wal/avro/ wal-core/src/main/java/org/apache/wal/ wal-core/src/test/java/org/apache/wal/

Author: esammer
Date: Wed Dec 28 07:30:40 2011
New Revision: 1225130

URL: http://svn.apache.org/viewvc?rev=1225130&view=rev
Log:
- Added open / close methods to WALReader interface to be consistent with writer.
- Fleshed out AvroWAL (kinda works).
- Implemented test for AvroWAL. Works(-ish).
- Updated the default event limit for AvroWALWriter to 100k instead of 100.

Added:
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java
Modified:
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
    incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
    incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java Wed Dec 28 07:30:40 2011
@@ -3,63 +3,61 @@ package org.apache.wal.avro;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
 import java.util.Map;
 
 import org.apache.wal.WAL;
+import org.apache.wal.WALException;
 import org.apache.wal.WALReader;
 import org.apache.wal.WALWriter;
 
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
-
 public class AvroWAL implements WAL {
 
   private File directory;
 
-  private RandomAccessFile walRAF;
-
-  private MappedByteBuffer indexBuffer;
+  private WALIndex index;
 
   @Override
   public void configure(Map<String, String> properties) {
-    // TODO Auto-generated method stub
+    index = new WALIndex();
 
+    directory = new File(properties.get("directory"));
+
+    index.setDirectory(directory);
   }
 
   @Override
   public void open() {
     try {
-      indexBuffer = (MappedByteBuffer) Files.map(new File(directory, "index"),
-          FileChannel.MapMode.READ_WRITE, 8);
-
-      walRAF = new RandomAccessFile(new File(directory, "data.wal"), "rwd");
+      index.open();
     } catch (FileNotFoundException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      throw new WALException("Unable to open WAL index. Exception follows.", e);
     } catch (IOException e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
+      throw new WALException("Unable to open WAL index. Exception follows.", e);
     }
   }
 
   @Override
   public void close() {
-    Closeables.closeQuietly(walRAF);
   }
 
   @Override
   public WALReader getReader() {
-    // TODO Auto-generated method stub
-    return null;
+    AvroWALReader reader = new AvroWALReader();
+
+    reader.setDirectory(directory);
+    reader.setIndex(index);
+
+    return reader;
   }
 
   @Override
   public WALWriter getWriter() {
-    // TODO Auto-generated method stub
-    return null;
+    AvroWALWriter writer = new AvroWALWriter();
+
+    writer.setDirectory(directory);
+    writer.setIndex(index);
+
+    return writer;
   }
 
 }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java Wed Dec 28 07:30:40 2011
@@ -45,6 +45,7 @@ public class AvroWALReader implements WA
     reader = new SpecificDatumReader<AvroWALEntry>(AvroWALEntry.class);
   }
 
+  @Override
   public void open() {
     logger.info("Opening write ahead log reader for directory:{}", directory);
 
@@ -61,6 +62,11 @@ public class AvroWALReader implements WA
     logger.debug("Opened write ahead log reader:{}", this);
   }
 
+  @Override
+  public void close() {
+    closeWALFile();
+  }
+
   private boolean ensureWALFile() {
     if (decoder == null) {
       File file = findNextFile();
@@ -170,14 +176,15 @@ public class AvroWALReader implements WA
     if (ensureWALFile()) {
       while (entry == null) {
         try {
-          entry = new AvroWALEntryAdapter(reader.read(null, decoder));
-          currentPosition = inputChannel.position();
-
-          if (logger.isDebugEnabled()) {
-            logger
-                .debug("Read entry:{} markPosition:{} currentPosition:{}",
-                    new Object[] { entry, index.getReadPosition(),
-                        currentPosition });
+          if (dataAvailable()) {
+            entry = new AvroWALEntryAdapter(reader.read(null, decoder));
+            currentPosition = inputChannel.position();
+
+            if (logger.isDebugEnabled()) {
+              logger.debug("Read entry:{} markPosition:{} currentPosition:{}",
+                  new Object[] { entry, index.getReadPosition(),
+                      currentPosition });
+            }
           }
         } catch (EOFException e) {
           /*
@@ -201,9 +208,21 @@ public class AvroWALReader implements WA
     return entry;
   }
 
+  private boolean dataAvailable() {
+    synchronized (index) {
+      if (!currentFile.getPath().equals(index.getWriteFile())
+          || currentPosition < index.getWritePosition()) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
   @Override
   public void mark() {
-    logger.debug("Updating currentPosition to:{}", currentPosition);
+    logger.debug("Marking currentFile:{} currentPosition:{}", currentFile,
+        currentPosition);
 
     index.updateReadIndex(currentFile.getPath(), currentPosition);
   }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java Wed Dec 28 07:30:40 2011
@@ -23,7 +23,7 @@ public class AvroWALWriter implements WA
 
   private static final Logger logger = LoggerFactory
       .getLogger(AvroWALWriter.class);
-  private static final int defaultEventLimit = 100;
+  private static final int defaultEventLimit = 100000;
 
   private File directory;
   private int eventLimit;

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java?rev=1225130&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java Wed Dec 28 07:30:40 2011
@@ -0,0 +1,116 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.wal.WALEntry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+public class TestAvroWAL {
+
+  private File testDirectory;
+  private AvroWAL wal;
+
+  @Before
+  public void setUp() {
+    testDirectory = new File("/tmp/wal-avro-" + System.currentTimeMillis());
+
+    testDirectory.mkdirs();
+
+    wal = new AvroWAL();
+
+    Map<String, String> conf = new HashMap<String, String>();
+
+    conf.put("directory", testDirectory.getPath());
+    wal.configure(conf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    Files.deleteRecursively(testDirectory.getCanonicalFile());
+  }
+
+  @Test
+  public void testReadWrite() throws InterruptedException {
+    wal.open();
+
+    final AvroWALReader reader = (AvroWALReader) wal.getReader();
+    final AvroWALWriter writer = (AvroWALWriter) wal.getWriter();
+
+    Runnable readerRunner = new Runnable() {
+
+      @Override
+      public void run() {
+        reader.open();
+
+        for (int i = 0; i < 5000; i++) {
+          WALEntry entry = reader.next();
+
+          if (i % 1000 == 0) {
+            reader.mark();
+          }
+
+          if (entry == null) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+            }
+          }
+
+        }
+
+        reader.mark();
+        reader.close();
+      }
+
+    };
+
+    Runnable writerRunner = new Runnable() {
+
+      @Override
+      public void run() {
+        writer.open();
+
+        for (int i = 0; i < 5000; i++) {
+          AvroWALEntryAdapter entry = new AvroWALEntryAdapter(
+              new AvroWALEntry());
+
+          entry.getEntry().setTimeStamp((long) i);
+
+          writer.write(entry);
+
+          if (i % 100 == 0) {
+            writer.mark();
+          }
+        }
+
+        writer.mark();
+        writer.close();
+      }
+
+    };
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+
+    executor.submit(writerRunner);
+    executor.submit(readerRunner);
+
+    executor.shutdown();
+
+    while (!executor.isTerminated()) {
+      executor.awaitTermination(1, TimeUnit.SECONDS);
+    }
+
+    wal.close();
+  }
+}

Modified: incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java Wed Dec 28 07:30:40 2011
@@ -2,8 +2,12 @@ package org.apache.wal;
 
 public interface WALReader {
 
+  public void open();
+
   public WALEntry next();
 
+  public void close();
+
   public void mark();
 
   public void reset();

Modified: incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java Wed Dec 28 07:30:40 2011
@@ -111,11 +111,21 @@ public class TestWAL {
       private MockWAL wal;
 
       @Override
+      public void open() {
+
+      }
+
+      @Override
       public WALEntry next() {
         return wal.next();
       }
 
       @Override
+      public void close() {
+
+      }
+
+      @Override
       public void mark() {
 
       }