You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/28 08:30:16 UTC
svn commit: r1225126 - in /incubator/flume/branches/flume-897: ./
wal/wal-avro/src/main/avro/ wal/wal-avro/src/main/java/org/apache/wal/avro/
wal/wal-avro/src/test/java/org/apache/wal/avro/
Author: esammer
Date: Wed Dec 28 07:30:16 2011
New Revision: 1225126
URL: http://svn.apache.org/viewvc?rev=1225126&view=rev
Log:
- Refactoring wal and extracting index maintenance into a separate class.
Added:
incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
Modified:
incubator/flume/branches/flume-897/pom.xml
incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl
incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
Modified: incubator/flume/branches/flume-897/pom.xml
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/pom.xml?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/pom.xml (original)
+++ incubator/flume/branches/flume-897/pom.xml Wed Dec 28 07:30:16 2011
@@ -461,7 +461,7 @@ limitations under the License.
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>r07</version>
+ <version>10.0.1</version>
</dependency>
<dependency>
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/avro/wal.avdl Wed Dec 28 07:30:16 2011
@@ -6,4 +6,14 @@ protocol AvroWALProtocol {
long timeStamp;
}
+ record AvroWALIndexEntry {
+ long position;
+ string path;
+ }
+
+ record AvroWALIndex {
+ int version;
+ array<AvroWALIndexEntry> entries;
+ }
+
}
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java Wed Dec 28 07:30:16 2011
@@ -1,11 +1,16 @@
package org.apache.wal.avro;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Arrays;
import org.apache.avro.io.Encoder;
+import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.wal.WALEntry;
import org.apache.wal.WALWriter;
@@ -13,32 +18,90 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
public class AvroWALWriter implements WALWriter {
+ private static final short writeIndexVersion = 1;
private static final Logger logger = LoggerFactory
.getLogger(AvroWALWriter.class);
+ private File directory;
+
private FileOutputStream walOutputStream;
private MappedByteBuffer indexBuffer;
private Encoder encoder;
private SpecificDatumWriter<AvroWALEntry> writer;
+ private ByteArrayOutputStream indexOutputStream;
+ private Encoder indexEncoder;
+ private SpecificDatumWriter<AvroWALIndex> indexWriter;
+
+ private File currentFile;
private long currentPosition;
private FileChannel outputChannel;
@Override
public void open() {
- outputChannel = walOutputStream.getChannel();
+ logger.info("Opening write ahead log in:{}", directory);
+
+ currentFile = new File(directory, System.currentTimeMillis() + ".wal");
try {
- indexBuffer.putLong(0, outputChannel.position());
+ walOutputStream = new FileOutputStream(currentFile, true);
+ outputChannel = walOutputStream.getChannel();
+ currentPosition = outputChannel.position();
+
+ indexOutputStream = new ByteArrayOutputStream(1024);
+ indexEncoder = EncoderFactory.get().jsonEncoder(AvroWALIndex.SCHEMA$,
+ indexOutputStream);
+ indexWriter = new SpecificDatumWriter<AvroWALIndex>(AvroWALIndex.class);
+
+ indexBuffer = Files.map(new File(directory, "write.idx"),
+ FileChannel.MapMode.READ_WRITE, 8 * 1024);
+
+ updateWriteIndex(currentPosition, currentFile.getPath());
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
+ encoder = EncoderFactory.get().directBinaryEncoder(walOutputStream, null);
+ writer = new SpecificDatumWriter<AvroWALEntry>(AvroWALEntry.class);
+
+ logger.debug("Opened write ahead log:{} currentPosition:{}", currentFile,
+ currentPosition);
+ }
+
+ private void updateWriteIndex(long currentPosition, String path) {
+ logger.debug("Updating write index to position:{} path:{}",
+ currentPosition, path);
+
+ AvroWALIndex index = AvroWALIndex
+ .newBuilder()
+ .setVersion(writeIndexVersion)
+ .setEntries(
+ Arrays.asList(AvroWALIndexEntry.newBuilder()
+ .setPath(currentFile.getPath()).setPosition(currentPosition)
+ .build())).build();
+
+ indexOutputStream.reset();
+
+ try {
+ indexWriter.write(index, indexEncoder);
+ indexEncoder.flush();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ indexBuffer.put(indexOutputStream.toByteArray());
indexBuffer.force();
+ indexBuffer.flip();
}
@Override
@@ -49,7 +112,7 @@ public class AvroWALWriter implements WA
try {
writer.write(((AvroWALEntryAdapter) entry).getEntry(), encoder);
encoder.flush();
- outputChannel.force(false);
+ outputChannel.force(true);
currentPosition = outputChannel.position();
if (logger.isDebugEnabled()) {
@@ -63,12 +126,14 @@ public class AvroWALWriter implements WA
@Override
public void close() {
+ logger.info("Closing write ahead log at:{}", currentFile);
+
+ Closeables.closeQuietly(outputChannel);
}
@Override
public void mark() {
- indexBuffer.putLong(0, currentPosition);
- indexBuffer.force();
+ updateWriteIndex(currentPosition, currentFile.getPath());
}
@Override
Added: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java?rev=1225126&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/WALIndex.java Wed Dec 28 07:30:16 2011
@@ -0,0 +1,51 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+
+public class WALIndex {
+
+ private static final String indexFileName = "wal-idx";
+ private static final Logger logger = LoggerFactory.getLogger(WALIndex.class);
+
+ private File directory;
+
+ private File indexFile;
+ private MappedByteBuffer indexBuffer;
+
+ public synchronized void open() throws FileNotFoundException, IOException {
+ indexFile = new File(directory, indexFileName);
+
+ indexBuffer = Files.map(indexFile, FileChannel.MapMode.READ_WRITE,
+ 16 * 1024);
+ }
+
+ public synchronized void updateIndex(String file, long position) {
+ logger.info("Updating WAL index with file:{} position:{}", file, position);
+
+ indexBuffer.putLong(position).putInt(file.length()).put(file.getBytes());
+ indexBuffer.force();
+ indexBuffer.position(0);
+ }
+
+ public synchronized File getDirectory() {
+ return directory;
+ }
+
+ public synchronized void setDirectory(File directory) {
+ this.directory = directory;
+ }
+
+ public synchronized File getIndexFile() {
+ return indexFile;
+ }
+
+}
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java?rev=1225126&r1=1225125&r2=1225126&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWALWriter.java Wed Dec 28 07:30:16 2011
@@ -16,9 +16,6 @@ import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.wal.avro.AvroWALEntry;
-import org.apache.wal.avro.AvroWALEntryAdapter;
-import org.apache.wal.avro.AvroWALWriter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -76,7 +73,7 @@ public class TestAvroWALWriter {
for (int i = 0; i < 205; i++) {
AvroWALEntryAdapter entry = new AvroWALEntryAdapter(new AvroWALEntry());
- entry.getEntry().timeStamp = System.currentTimeMillis();
+ entry.getEntry().setTimeStamp(System.currentTimeMillis());
writer.write(entry);
Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java?rev=1225126&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestWALIndex.java Wed Dec 28 07:30:16 2011
@@ -0,0 +1,45 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+public class TestWALIndex {
+
+ private File testDirectory;
+ private WALIndex index;
+
+ @Before
+ public void setUp() {
+ testDirectory = new File("/tmp/wal-avro-index-"
+ + System.currentTimeMillis());
+
+ testDirectory.mkdirs();
+
+ index = new WALIndex();
+ index.setDirectory(testDirectory);
+ }
+
+ @SuppressWarnings("deprecation")
+ @After
+ public void tearDown() throws IOException {
+ Files.deleteRecursively(testDirectory.getCanonicalFile());
+ }
+
+ @Test
+ public void testUpdate() throws FileNotFoundException, IOException {
+ index.open();
+
+ index.updateIndex("foo", 0);
+ index.updateIndex("foo", 1);
+ index.updateIndex("foo", 2);
+ index.updateIndex("bar", 0);
+ }
+
+}