You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/12/28 08:30:41 UTC
svn commit: r1225130 - in /incubator/flume/branches/flume-897/wal:
wal-avro/src/main/java/org/apache/wal/avro/
wal-avro/src/test/java/org/apache/wal/avro/
wal-core/src/main/java/org/apache/wal/ wal-core/src/test/java/org/apache/wal/
Author: esammer
Date: Wed Dec 28 07:30:40 2011
New Revision: 1225130
URL: http://svn.apache.org/viewvc?rev=1225130&view=rev
Log:
- Added open / close methods to WALReader interface to be consistent with writer.
- Fleshed out AvroWAL (kinda works).
- Implemented test for AvroWAL. Works(-ish).
- Updated the default event limit for AvroWALWriter to 100k instead of 100.
Added:
incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java
Modified:
incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWAL.java Wed Dec 28 07:30:40 2011
@@ -3,63 +3,61 @@ package org.apache.wal.avro;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
import java.util.Map;
import org.apache.wal.WAL;
+import org.apache.wal.WALException;
import org.apache.wal.WALReader;
import org.apache.wal.WALWriter;
-import com.google.common.io.Closeables;
-import com.google.common.io.Files;
-
public class AvroWAL implements WAL {
private File directory;
- private RandomAccessFile walRAF;
-
- private MappedByteBuffer indexBuffer;
+ private WALIndex index;
@Override
public void configure(Map<String, String> properties) {
- // TODO Auto-generated method stub
+ index = new WALIndex();
+ directory = new File(properties.get("directory"));
+
+ index.setDirectory(directory);
}
@Override
public void open() {
try {
- indexBuffer = (MappedByteBuffer) Files.map(new File(directory, "index"),
- FileChannel.MapMode.READ_WRITE, 8);
-
- walRAF = new RandomAccessFile(new File(directory, "data.wal"), "rwd");
+ index.open();
} catch (FileNotFoundException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new WALException("Unable to open WAL index. Exception follows.", e);
} catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
+ throw new WALException("Unable to open WAL index. Exception follows.", e);
}
}
@Override
public void close() {
- Closeables.closeQuietly(walRAF);
}
@Override
public WALReader getReader() {
- // TODO Auto-generated method stub
- return null;
+ AvroWALReader reader = new AvroWALReader();
+
+ reader.setDirectory(directory);
+ reader.setIndex(index);
+
+ return reader;
}
@Override
public WALWriter getWriter() {
- // TODO Auto-generated method stub
- return null;
+ AvroWALWriter writer = new AvroWALWriter();
+
+ writer.setDirectory(directory);
+ writer.setIndex(index);
+
+ return writer;
}
}
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALReader.java Wed Dec 28 07:30:40 2011
@@ -45,6 +45,7 @@ public class AvroWALReader implements WA
reader = new SpecificDatumReader<AvroWALEntry>(AvroWALEntry.class);
}
+ @Override
public void open() {
logger.info("Opening write ahead log reader for directory:{}", directory);
@@ -61,6 +62,11 @@ public class AvroWALReader implements WA
logger.debug("Opened write ahead log reader:{}", this);
}
+ @Override
+ public void close() {
+ closeWALFile();
+ }
+
private boolean ensureWALFile() {
if (decoder == null) {
File file = findNextFile();
@@ -170,14 +176,15 @@ public class AvroWALReader implements WA
if (ensureWALFile()) {
while (entry == null) {
try {
- entry = new AvroWALEntryAdapter(reader.read(null, decoder));
- currentPosition = inputChannel.position();
-
- if (logger.isDebugEnabled()) {
- logger
- .debug("Read entry:{} markPosition:{} currentPosition:{}",
- new Object[] { entry, index.getReadPosition(),
- currentPosition });
+ if (dataAvailable()) {
+ entry = new AvroWALEntryAdapter(reader.read(null, decoder));
+ currentPosition = inputChannel.position();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Read entry:{} markPosition:{} currentPosition:{}",
+ new Object[] { entry, index.getReadPosition(),
+ currentPosition });
+ }
}
} catch (EOFException e) {
/*
@@ -201,9 +208,21 @@ public class AvroWALReader implements WA
return entry;
}
+ private boolean dataAvailable() {
+ synchronized (index) {
+ if (!currentFile.getPath().equals(index.getWriteFile())
+ || currentPosition < index.getWritePosition()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
@Override
public void mark() {
- logger.debug("Updating currentPosition to:{}", currentPosition);
+ logger.debug("Marking currentFile:{} currentPosition:{}", currentFile,
+ currentPosition);
index.updateReadIndex(currentFile.getPath(), currentPosition);
}
Modified: incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/main/java/org/apache/wal/avro/AvroWALWriter.java Wed Dec 28 07:30:40 2011
@@ -23,7 +23,7 @@ public class AvroWALWriter implements WA
private static final Logger logger = LoggerFactory
.getLogger(AvroWALWriter.class);
- private static final int defaultEventLimit = 100;
+ private static final int defaultEventLimit = 100000;
private File directory;
private int eventLimit;
Added: incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java?rev=1225130&view=auto
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java (added)
+++ incubator/flume/branches/flume-897/wal/wal-avro/src/test/java/org/apache/wal/avro/TestAvroWAL.java Wed Dec 28 07:30:40 2011
@@ -0,0 +1,116 @@
+package org.apache.wal.avro;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.wal.WALEntry;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.io.Files;
+
+public class TestAvroWAL {
+
+ private File testDirectory;
+ private AvroWAL wal;
+
+ @Before
+ public void setUp() {
+ testDirectory = new File("/tmp/wal-avro-" + System.currentTimeMillis());
+
+ testDirectory.mkdirs();
+
+ wal = new AvroWAL();
+
+ Map<String, String> conf = new HashMap<String, String>();
+
+ conf.put("directory", testDirectory.getPath());
+ wal.configure(conf);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Files.deleteRecursively(testDirectory.getCanonicalFile());
+ }
+
+ @Test
+ public void testReadWrite() throws InterruptedException {
+ wal.open();
+
+ final AvroWALReader reader = (AvroWALReader) wal.getReader();
+ final AvroWALWriter writer = (AvroWALWriter) wal.getWriter();
+
+ Runnable readerRunner = new Runnable() {
+
+ @Override
+ public void run() {
+ reader.open();
+
+ for (int i = 0; i < 5000; i++) {
+ WALEntry entry = reader.next();
+
+ if (i % 1000 == 0) {
+ reader.mark();
+ }
+
+ if (entry == null) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
+
+ reader.mark();
+ reader.close();
+ }
+
+ };
+
+ Runnable writerRunner = new Runnable() {
+
+ @Override
+ public void run() {
+ writer.open();
+
+ for (int i = 0; i < 5000; i++) {
+ AvroWALEntryAdapter entry = new AvroWALEntryAdapter(
+ new AvroWALEntry());
+
+ entry.getEntry().setTimeStamp((long) i);
+
+ writer.write(entry);
+
+ if (i % 100 == 0) {
+ writer.mark();
+ }
+ }
+
+ writer.mark();
+ writer.close();
+ }
+
+ };
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ executor.submit(writerRunner);
+ executor.submit(readerRunner);
+
+ executor.shutdown();
+
+ while (!executor.isTerminated()) {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ }
+
+ wal.close();
+ }
+}
Modified: incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/main/java/org/apache/wal/WALReader.java Wed Dec 28 07:30:40 2011
@@ -2,8 +2,12 @@ package org.apache.wal;
public interface WALReader {
+ public void open();
+
public WALEntry next();
+ public void close();
+
public void mark();
public void reset();
Modified: incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java?rev=1225130&r1=1225129&r2=1225130&view=diff
==============================================================================
--- incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java (original)
+++ incubator/flume/branches/flume-897/wal/wal-core/src/test/java/org/apache/wal/TestWAL.java Wed Dec 28 07:30:40 2011
@@ -111,11 +111,21 @@ public class TestWAL {
private MockWAL wal;
@Override
+ public void open() {
+
+ }
+
+ @Override
public WALEntry next() {
return wal.next();
}
@Override
+ public void close() {
+
+ }
+
+ @Override
public void mark() {
}