You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/12/19 01:32:36 UTC
git commit: FLUME-1794. FileChannel check for full disks in the
background
Updated Branches:
refs/heads/trunk df132ecd7 -> 112e80a22
FLUME-1794. FileChannel check for full disks in the background
(Brock Noland via Hari Shreedharan)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/112e80a2
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/112e80a2
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/112e80a2
Branch: refs/heads/trunk
Commit: 112e80a22a11f8a9dcee4cc13066c1615a4752c3
Parents: df132ec
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Dec 18 16:30:56 2012 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Dec 18 16:32:21 2012 -0800
----------------------------------------------------------------------
.../java/org/apache/flume/channel/file/Log.java | 21 ++++++--
.../org/apache/flume/channel/file/LogFile.java | 38 ++++++++++++++-
.../apache/flume/channel/file/LogFileFactory.java | 5 +-
.../org/apache/flume/channel/file/LogFileV2.java | 5 +-
.../org/apache/flume/channel/file/LogFileV3.java | 6 ++-
.../flume/channel/file/TestFileChannelRestart.java | 1 +
.../org/apache/flume/channel/file/TestLog.java | 30 ++++++++++-
.../org/apache/flume/channel/file/TestLogFile.java | 6 +-
.../channel/file/TestTransactionEventRecordV3.java | 2 -
9 files changed, 93 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index af11dc5..8a4201c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -112,6 +112,7 @@ class Log {
private String encryptionCipherProvider;
private String encryptionKeyAlias;
private Key encryptionKey;
+ private final long usableSpaceRefreshInterval;
static class Builder {
private long bCheckpointInterval;
@@ -130,6 +131,12 @@ class Log {
private KeyProvider bEncryptionKeyProvider;
private String bEncryptionKeyAlias;
private String bEncryptionCipherProvider;
+ private long bUsableSpaceRefreshInterval = 15L * 1000L;
+
+ Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
+ bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
+ return this;
+ }
Builder setCheckpointInterval(long interval) {
bCheckpointInterval = interval;
@@ -206,7 +213,8 @@ class Log {
bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
bEncryptionKeyProvider, bEncryptionKeyAlias,
- bEncryptionCipherProvider, bLogDirs);
+ bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
+ bLogDirs);
}
}
@@ -215,13 +223,16 @@ class Log {
String name, boolean useLogReplayV1, boolean useFastReplay,
long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
@Nullable String encryptionKeyAlias,
- @Nullable String encryptionCipherProvider, File... logDirs)
+ @Nullable String encryptionCipherProvider,
+ long usableSpaceRefreshInterval, File... logDirs)
throws IOException {
Preconditions.checkArgument(checkpointInterval > 0,
"checkpointInterval <= 0");
Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
Preconditions.checkNotNull(checkpointDir, "checkpointDir");
+ Preconditions.checkArgument(usableSpaceRefreshInterval > 0,
+ "usableSpaceRefreshInterval <= 0");
Preconditions.checkArgument(
checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
+ checkpointDir + " could not be created");
@@ -234,6 +245,7 @@ class Log {
this.useLogReplayV1 = useLogReplayV1;
this.useFastReplay = useFastReplay;
this.minimumRequiredSpace = minimumRequiredSpace;
+ this.usableSpaceRefreshInterval = usableSpaceRefreshInterval;
for (File logDir : logDirs) {
Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
"LogDir " + logDir + " could not be created");
@@ -292,7 +304,6 @@ class Log {
* directly before the shutdown or crash.
* @throws IOException
*/
- @SuppressWarnings("deprecation")
void replay() throws IOException {
Preconditions.checkState(!open, "Cannot replay after Log has been opened");
@@ -406,6 +417,7 @@ class Log {
}
}
+ @SuppressWarnings("deprecation")
private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
KeyProvider encryptionKeyProvider) throws Exception {
CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
@@ -801,7 +813,7 @@ class Log {
File file = new File(logDirs[index], PREFIX + fileID);
LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
maxFileSize, encryptionKey, encryptionKeyAlias,
- encryptionCipherProvider);
+ encryptionCipherProvider, usableSpaceRefreshInterval);
idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
encryptionKeyProvider));
// writer from this point on will get new reference
@@ -1021,7 +1033,6 @@ class Log {
private static final Logger LOG = LoggerFactory
.getLogger(BackgroundWorker.class);
private final Log log;
- private volatile boolean run = true;
public BackgroundWorker(Log log) {
this.log = log;
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 8089ff3..1db3717 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -36,6 +37,7 @@ import org.apache.flume.tools.DirectMemoryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -116,6 +118,34 @@ abstract class LogFile {
}
}
+ @VisibleForTesting
+ static class CachedFSUsableSpace {
+ private final File fs;
+ private final long interval;
+ private final AtomicLong lastRefresh;
+ private final AtomicLong value;
+
+ CachedFSUsableSpace(File fs, long interval) {
+ this.fs = fs;
+ this.interval = interval;
+ this.value = new AtomicLong(fs.getUsableSpace());
+ this.lastRefresh = new AtomicLong(System.currentTimeMillis());
+ }
+
+ void decrement(long numBytes) {
+ Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero");
+ value.addAndGet(-numBytes);
+ }
+ long getUsableSpace() {
+ long now = System.currentTimeMillis();
+ if(now - interval > lastRefresh.get()) {
+ value.set(fs.getUsableSpace());
+ lastRefresh.set(now);
+ }
+ return Math.max(value.get(), 0L);
+ }
+ }
+
static abstract class Writer {
private final int logFileID;
private final File file;
@@ -123,10 +153,12 @@ abstract class LogFile {
private final RandomAccessFile writeFileHandle;
private final FileChannel writeFileChannel;
private final CipherProvider.Encryptor encryptor;
+ private final CachedFSUsableSpace usableSpace;
private volatile boolean open;
+
Writer(File file, int logFileID, long maxFileSize,
- CipherProvider.Encryptor encryptor)
+ CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval)
throws IOException {
this.file = file;
this.logFileID = logFileID;
@@ -135,6 +167,7 @@ abstract class LogFile {
this.encryptor = encryptor;
writeFileHandle = new RandomAccessFile(file, "rw");
writeFileChannel = writeFileHandle.getChannel();
+ usableSpace = new CachedFSUsableSpace(file, usableSpaceRefreshInterval);
LOG.info("Opened " + file);
open = true;
}
@@ -156,7 +189,7 @@ abstract class LogFile {
}
long getUsableSpace() {
- return file.getUsableSpace();
+ return usableSpace.getUsableSpace();
}
long getMaxSize() {
@@ -205,6 +238,7 @@ abstract class LogFile {
Preconditions.checkState(offset >= 0, String.valueOf(offset));
// OP_RECORD + size + buffer
int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit();
+ usableSpace.decrement(recordLength);
preallocate(recordLength);
ByteBuffer toWrite = ByteBuffer.allocate(recordLength);
toWrite.put(OP_RECORD);
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 1fe219a..9c98d8c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -67,13 +67,14 @@ class LogFileFactory {
static LogFile.Writer getWriter(File file, int logFileID,
long maxFileSize, @Nullable Key encryptionKey,
@Nullable String encryptionKeyAlias,
- @Nullable String encryptionCipherProvider) throws IOException {
+ @Nullable String encryptionCipherProvider,
+ long usableSpaceRefreshInterval) throws IOException {
Preconditions.checkState(!file.exists(), "File already exists " +
file.getAbsolutePath());
Preconditions.checkState(file.createNewFile(), "File could not be created "
+ file.getAbsolutePath());
return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey,
- encryptionKeyAlias, encryptionCipherProvider);
+ encryptionKeyAlias, encryptionCipherProvider, usableSpaceRefreshInterval);
}
static LogFile.RandomReader getRandomReader(File file,
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
index 4c593a4..f286c57 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
@@ -93,9 +93,10 @@ class LogFileV2 extends LogFile {
static class Writer extends LogFile.Writer {
- Writer(File file, int logFileID, long maxFileSize)
+ Writer(File file, int logFileID, long maxFileSize,
+ long usableSpaceRefreshInterval)
throws IOException {
- super(file, logFileID, maxFileSize, null);
+ super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval);
RandomAccessFile writeFileHandle = getFileHandle();
writeFileHandle.writeInt(getVersion());
writeFileHandle.writeInt(logFileID);
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index aac7805..f51935c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -169,10 +169,12 @@ class LogFileV3 extends LogFile {
Writer(File file, int logFileID, long maxFileSize,
@Nullable Key encryptionKey,
@Nullable String encryptionKeyAlias,
- @Nullable String encryptionCipherProvider)
+ @Nullable String encryptionCipherProvider,
+ long usableSpaceRefreshInterval)
throws IOException {
super(file, logFileID, maxFileSize, CipherProviderFactory.
- getEncrypter(encryptionCipherProvider, encryptionKey));
+ getEncrypter(encryptionCipherProvider, encryptionKey),
+ usableSpaceRefreshInterval);
ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
ProtosFactory.LogFileMetaData.newBuilder();
if(encryptionKey != null) {
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 3d5bf59..ea57cdb 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -311,6 +311,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
File inflight = new File(checkpointDir, name);
RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
writer.write(new Random().nextInt());
+ writer.close();
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index f9dbba5..6751714 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -18,6 +18,7 @@
*/
package org.apache.flume.channel.file;
+import static org.mockito.Mockito.*;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -184,7 +185,7 @@ public class TestLog {
}
public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException,
InterruptedException {
- long minimumRequireSpace = checkpointDir.getUsableSpace() -
+ long minimumRequiredSpace = checkpointDir.getUsableSpace() -
(10L* 1024L * 1024L);
log.close();
log = new Log.Builder().setCheckpointInterval(
@@ -192,12 +193,13 @@ public class TestLog {
FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
dataDirs).setChannelName("testlog").
- setMinimumRequiredSpace(minimumRequireSpace).build();
+ setMinimumRequiredSpace(minimumRequiredSpace)
+ .setUsableSpaceRefreshInterval(1L).build();
log.replay();
File filler = new File(checkpointDir, "filler");
byte[] buffer = new byte[64 * 1024];
FileOutputStream out = new FileOutputStream(filler);
- while(checkpointDir.getUsableSpace() > minimumRequireSpace) {
+ while(checkpointDir.getUsableSpace() > minimumRequiredSpace) {
out.write(buffer);
}
out.close();
@@ -436,6 +438,28 @@ public class TestLog {
Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
}
+ @Test
+ public void testCachedFSUsableSpace() throws Exception {
+ File fs = mock(File.class);
+ when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE);
+ LogFile.CachedFSUsableSpace cachedFS =
+ new LogFile.CachedFSUsableSpace(fs, 1000L);
+ Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE);
+ cachedFS.decrement(Integer.MAX_VALUE);
+ Assert.assertEquals(cachedFS.getUsableSpace(),
+ Long.MAX_VALUE - Integer.MAX_VALUE);
+ try {
+ cachedFS.decrement(-1);
+ Assert.fail();
+ } catch (IllegalArgumentException expected) {
+
+ }
+ when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE - 1L);
+ Thread.sleep(1100);
+ Assert.assertEquals(cachedFS.getUsableSpace(),
+ Long.MAX_VALUE - 1L);
+ }
+
private void takeAndVerify(FlumeEventPointer eventPointerIn,
FlumeEvent eventIn) throws IOException, InterruptedException {
FlumeEventQueue queue = log.getFlumeEventQueue();
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 9e28599..bef22ef 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -55,7 +55,7 @@ public class TestLogFile {
dataFile = new File(dataDir, String.valueOf(fileID));
Assert.assertTrue(dataDir.isDirectory());
logFileWriter = LogFileFactory.getWriter(dataFile, fileID,
- Integer.MAX_VALUE, null, null, null);
+ Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE);
}
@After
public void cleanup() throws IOException {
@@ -72,7 +72,7 @@ public class TestLogFile {
Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile());
try {
LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
- null);
+ null, Long.MAX_VALUE);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
@@ -86,7 +86,7 @@ public class TestLogFile {
Assert.assertTrue(dataFile.mkdirs());
try {
LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
- null);
+ null, Long.MAX_VALUE);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
index 274ee7b..f403422 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
@@ -20,9 +20,7 @@ package org.apache.flume.channel.file;
import static org.mockito.Mockito.*;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;