You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/28 08:30:16 UTC

svn commit: r1225126 - in /incubator/flume/branches/flume-897: ./ wal/wal-avro/src/main/avro/ wal/wal-avro/src/main/java/org/apache/wal/avro/ wal/wal-avro/src/test/java/org/apache/wal/avro/

Author: esammer
Date: Wed Dec 28 07:30:16 2011
New Revision: 1225126

URL: http://svn.apache.org/viewvc?rev=1225126&view=rev
Log:
- Refactoring wal and extracting index maintenance into a separate class.

Added:
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
Modified:
    incubator/flume/branches/flume-897/pom.xml
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl
    incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
    incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java

Modified: incubator/flume/branches/flume-897/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/pom.xml?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/pom.xml (original)
+++ incubator/flume/branches/flume-897/pom.xml Wed Dec 28 07:30:16 2011
@@ -461,7 +461,7 @@ limitations under the License.
       <dependency>
         <groupId>com.google.guava</groupId>
         <artifactId>guava</artifactId>
-        <version>r07</version>
+        <version>10.0.1</version>
       </dependency>
 
       <dependency>

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl Wed Dec 28 07:30:16 2011
@@ -6,4 +6,14 @@ protocol AvroWALProtocol {
     long timeStamp;
   }
 
+  record AvroWALIndexEntry {
+    long position;
+    string path;
+  }
+
+  record AvroWALIndex {
+    int version;
+    array<AvroWALIndexEntry> entries;
+  }
+
 }

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java Wed Dec 28 07:30:16 2011
@@ -1,11 +1,16 @@
 package org.apache.wal.avro;
 
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
 
 import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.wal.WALEntry;
 import org.apache.wal.WALWriter;
@@ -13,32 +18,90 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
 
 public class AvroWALWriter implements WALWriter {
 
+  private static final short writeIndexVersion = 1;
   private static final Logger logger = LoggerFactory
       .getLogger(AvroWALWriter.class);
 
+  private File directory;
+
   private FileOutputStream walOutputStream;
   private MappedByteBuffer indexBuffer;
   private Encoder encoder;
   private SpecificDatumWriter<AvroWALEntry> writer;
 
+  private ByteArrayOutputStream indexOutputStream;
+  private Encoder indexEncoder;
+  private SpecificDatumWriter<AvroWALIndex> indexWriter;
+
+  private File currentFile;
   private long currentPosition;
   private FileChannel outputChannel;
 
   @Override
   public void open() {
-    outputChannel = walOutputStream.getChannel();
+    logger.info("Opening write ahead log in:{}", directory);
+
+    currentFile = new File(directory, System.currentTimeMillis() + ".wal");
 
     try {
-      indexBuffer.putLong(0, outputChannel.position());
+      walOutputStream = new FileOutputStream(currentFile, true);
+      outputChannel = walOutputStream.getChannel();
+      currentPosition = outputChannel.position();
+
+      indexOutputStream = new ByteArrayOutputStream(1024);
+      indexEncoder = EncoderFactory.get().jsonEncoder(AvroWALIndex.SCHEMA$,
+          indexOutputStream);
+      indexWriter = new SpecificDatumWriter<AvroWALIndex>(AvroWALIndex.class);
+
+      indexBuffer = Files.map(new File(directory, "write.idx"),
+          FileChannel.MapMode.READ_WRITE, 8 * 1024);
+
+      updateWriteIndex(currentPosition, currentFile.getPath());
+    } catch (FileNotFoundException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
     } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
     }
 
+    encoder = EncoderFactory.get().directBinaryEncoder(walOutputStream, null);
+    writer = new SpecificDatumWriter<AvroWALEntry>(AvroWALEntry.class);
+
+    logger.debug("Opened write ahead log:{} currentPosition:{}", currentFile,
+        currentPosition);
+  }
+
+  private void updateWriteIndex(long currentPosition, String path) {
+    logger.debug("Updating write index to position:{} path:{}",
+        currentPosition, path);
+
+    AvroWALIndex index = AvroWALIndex
+        .newBuilder()
+        .setVersion(writeIndexVersion)
+        .setEntries(
+            Arrays.asList(AvroWALIndexEntry.newBuilder()
+                .setPath(currentFile.getPath()).setPosition(currentPosition)
+                .build())).build();
+
+    indexOutputStream.reset();
+
+    try {
+      indexWriter.write(index, indexEncoder);
+      indexEncoder.flush();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+
+    indexBuffer.put(indexOutputStream.toByteArray());
     indexBuffer.force();
+    indexBuffer.flip();
   }
 
   @Override
@@ -49,7 +112,7 @@ public class AvroWALWriter implements WA
     try {
       writer.write(((AvroWALEntryAdapter) entry).getEntry(), encoder);
       encoder.flush();
-      outputChannel.force(false);
+      outputChannel.force(true);
       currentPosition = outputChannel.position();
 
       if (logger.isDebugEnabled()) {
@@ -63,12 +126,14 @@ public class AvroWALWriter implements WA
 
   @Override
   public void close() {
+    logger.info("Closing write ahead log at:{}", currentFile);
+
+    Closeables.closeQuietly(outputChannel);
   }
 
   @Override
   public void mark() {
-    indexBuffer.putLong(0, currentPosition);
-    indexBuffer.force();
+    updateWriteIndex(currentPosition, currentFile.getPath());
   }
 
   @Override

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java?rev=1225126&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java Wed Dec 28 07:30:16 2011
@@ -0,0 +1,51 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+public class WALIndex {
+
+  private static final String indexFileName = "wal-idx";
+  private static final Logger logger = LoggerFactory.getLogger(WALIndex.class);
+
+  private File directory;
+
+  private File indexFile;
+  private MappedByteBuffer indexBuffer;
+
+  public synchronized void open() throws FileNotFoundException, IOException {
+    indexFile = new File(directory, indexFileName);
+
+    indexBuffer = Files.map(indexFile, FileChannel.MapMode.READ_WRITE,
+        16 * 1024);
+  }
+
+  public synchronized void updateIndex(String file, long position) {
+    logger.info("Updating WAL index with file:{} position:{}", file, position);
+
+    indexBuffer.putLong(position).putInt(file.length()).put(file.getBytes());
+    indexBuffer.force();
+    indexBuffer.position(0);
+  }
+
+  public synchronized File getDirectory() {
+    return directory;
+  }
+
+  public synchronized void setDirectory(File directory) {
+    this.directory = directory;
+  }
+
+  public synchronized File getIndexFile() {
+    return indexFile;
+  }
+
+}

Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java Wed Dec 28 07:30:16 2011
@@ -16,9 +16,6 @@ import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.wal.avro.AvroWALEntry;
-import org.apache.wal.avro.AvroWALEntryAdapter;
-import org.apache.wal.avro.AvroWALWriter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -76,7 +73,7 @@ public class TestAvroWALWriter {
     for (int i = 0; i < 205; i++) {
       AvroWALEntryAdapter entry = new AvroWALEntryAdapter(new AvroWALEntry());
 
-      entry.getEntry().timeStamp = System.currentTimeMillis();
+      entry.getEntry().setTimeStamp(System.currentTimeMillis());
 
       writer.write(entry);
 

Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java?rev=1225126&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java Wed Dec 28 07:30:16 2011
@@ -0,0 +1,45 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+public class TestWALIndex {
+
+  private File testDirectory;
+  private WALIndex index;
+
+  @Before
+  public void setUp() {
+    testDirectory = new File("/tmp/wal-avro-index-"
+        + System.currentTimeMillis());
+
+    testDirectory.mkdirs();
+
+    index = new WALIndex();
+    index.setDirectory(testDirectory);
+  }
+
+  @SuppressWarnings("deprecation")
+  @After
+  public void tearDown() throws IOException {
+    Files.deleteRecursively(testDirectory.getCanonicalFile());
+  }
+
+  @Test
+  public void testUpdate() throws FileNotFoundException, IOException {
+    index.open();
+
+    index.updateIndex("foo", 0);
+    index.updateIndex("foo", 1);
+    index.updateIndex("foo", 2);
+    index.updateIndex("bar", 0);
+  }
+
+}