You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2012/12/19 01:32:36 UTC

git commit: FLUME-1794. FileChannel check for full disks in the background

Updated Branches:
  refs/heads/trunk df132ecd7 -> 112e80a22


FLUME-1794. FileChannel check for full disks in the background

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/112e80a2
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/112e80a2
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/112e80a2

Branch: refs/heads/trunk
Commit: 112e80a22a11f8a9dcee4cc13066c1615a4752c3
Parents: df132ec
Author: Hari Shreedharan <hs...@apache.org>
Authored: Tue Dec 18 16:30:56 2012 -0800
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Tue Dec 18 16:32:21 2012 -0800

----------------------------------------------------------------------
 .../java/org/apache/flume/channel/file/Log.java    |   21 ++++++--
 .../org/apache/flume/channel/file/LogFile.java     |   38 ++++++++++++++-
 .../apache/flume/channel/file/LogFileFactory.java  |    5 +-
 .../org/apache/flume/channel/file/LogFileV2.java   |    5 +-
 .../org/apache/flume/channel/file/LogFileV3.java   |    6 ++-
 .../flume/channel/file/TestFileChannelRestart.java |    1 +
 .../org/apache/flume/channel/file/TestLog.java     |   30 ++++++++++-
 .../org/apache/flume/channel/file/TestLogFile.java |    6 +-
 .../channel/file/TestTransactionEventRecordV3.java |    2 -
 9 files changed, 93 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index af11dc5..8a4201c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -112,6 +112,7 @@ class Log {
   private String encryptionCipherProvider;
   private String encryptionKeyAlias;
   private Key encryptionKey;
+  private final long usableSpaceRefreshInterval;
 
   static class Builder {
     private long bCheckpointInterval;
@@ -130,6 +131,12 @@ class Log {
     private KeyProvider bEncryptionKeyProvider;
     private String bEncryptionKeyAlias;
     private String bEncryptionCipherProvider;
+    private long bUsableSpaceRefreshInterval = 15L * 1000L;
+
+    Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
+      bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
+      return this;
+    }
 
     Builder setCheckpointInterval(long interval) {
       bCheckpointInterval = interval;
@@ -206,7 +213,8 @@ class Log {
           bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
           useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
           bEncryptionKeyProvider, bEncryptionKeyAlias,
-          bEncryptionCipherProvider, bLogDirs);
+          bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
+          bLogDirs);
     }
   }
 
@@ -215,13 +223,16 @@ class Log {
       String name, boolean useLogReplayV1, boolean useFastReplay,
       long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
       @Nullable String encryptionKeyAlias,
-      @Nullable String encryptionCipherProvider, File... logDirs)
+      @Nullable String encryptionCipherProvider,
+      long usableSpaceRefreshInterval, File... logDirs)
           throws IOException {
     Preconditions.checkArgument(checkpointInterval > 0,
         "checkpointInterval <= 0");
     Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
     Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
     Preconditions.checkNotNull(checkpointDir, "checkpointDir");
+    Preconditions.checkArgument(usableSpaceRefreshInterval > 0,
+        "usableSpaceRefreshInterval <= 0");
     Preconditions.checkArgument(
         checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
             + checkpointDir + " could not be created");
@@ -234,6 +245,7 @@ class Log {
     this.useLogReplayV1 = useLogReplayV1;
     this.useFastReplay = useFastReplay;
     this.minimumRequiredSpace = minimumRequiredSpace;
+    this.usableSpaceRefreshInterval = usableSpaceRefreshInterval;
     for (File logDir : logDirs) {
       Preconditions.checkArgument(logDir.isDirectory() || logDir.mkdirs(),
           "LogDir " + logDir + " could not be created");
@@ -292,7 +304,6 @@ class Log {
    * directly before the shutdown or crash.
    * @throws IOException
    */
-  @SuppressWarnings("deprecation")
   void replay() throws IOException {
     Preconditions.checkState(!open, "Cannot replay after Log has been opened");
 
@@ -406,6 +417,7 @@ class Log {
     }
   }
 
+  @SuppressWarnings("deprecation")
   private void doReplay(FlumeEventQueue queue, List<File> dataFiles,
           KeyProvider encryptionKeyProvider) throws Exception {
     CheckpointRebuilder rebuilder = new CheckpointRebuilder(dataFiles,
@@ -801,7 +813,7 @@ class Log {
           File file = new File(logDirs[index], PREFIX + fileID);
           LogFile.Writer writer = LogFileFactory.getWriter(file, fileID,
               maxFileSize, encryptionKey, encryptionKeyAlias,
-              encryptionCipherProvider);
+              encryptionCipherProvider, usableSpaceRefreshInterval);
           idLogFileMap.put(fileID, LogFileFactory.getRandomReader(file,
               encryptionKeyProvider));
           // writer from this point on will get new reference
@@ -1021,7 +1033,6 @@ class Log {
     private static final Logger LOG = LoggerFactory
         .getLogger(BackgroundWorker.class);
     private final Log log;
-    private volatile boolean run = true;
 
     public BackgroundWorker(Log log) {
       this.log = log;

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 8089ff3..1db3717 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -27,6 +27,7 @@ import java.nio.channels.FileChannel;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
@@ -36,6 +37,7 @@ import org.apache.flume.tools.DirectMemoryUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -116,6 +118,34 @@ abstract class LogFile {
     }
   }
 
+  @VisibleForTesting
+  static class CachedFSUsableSpace {
+    private final File fs;
+    private final long interval;
+    private final AtomicLong lastRefresh;
+    private final AtomicLong value;
+
+    CachedFSUsableSpace(File fs, long interval) {
+      this.fs = fs;
+      this.interval = interval;
+      this.value = new AtomicLong(fs.getUsableSpace());
+      this.lastRefresh = new AtomicLong(System.currentTimeMillis());
+    }
+
+    void decrement(long numBytes) {
+      Preconditions.checkArgument(numBytes >= 0, "numBytes less than zero");
+      value.addAndGet(-numBytes);
+    }
+    long getUsableSpace() {
+      long now = System.currentTimeMillis();
+      if(now - interval > lastRefresh.get()) {
+        value.set(fs.getUsableSpace());
+        lastRefresh.set(now);
+      }
+      return Math.max(value.get(), 0L);
+    }
+  }
+
   static abstract class Writer {
     private final int logFileID;
     private final File file;
@@ -123,10 +153,12 @@ abstract class LogFile {
     private final RandomAccessFile writeFileHandle;
     private final FileChannel writeFileChannel;
     private final CipherProvider.Encryptor encryptor;
+    private final CachedFSUsableSpace usableSpace;
     private volatile boolean open;
 
+
     Writer(File file, int logFileID, long maxFileSize,
-        CipherProvider.Encryptor encryptor)
+        CipherProvider.Encryptor encryptor, long usableSpaceRefreshInterval)
         throws IOException {
       this.file = file;
       this.logFileID = logFileID;
@@ -135,6 +167,7 @@ abstract class LogFile {
       this.encryptor = encryptor;
       writeFileHandle = new RandomAccessFile(file, "rw");
       writeFileChannel = writeFileHandle.getChannel();
+      usableSpace = new CachedFSUsableSpace(file, usableSpaceRefreshInterval);
       LOG.info("Opened " + file);
       open = true;
     }
@@ -156,7 +189,7 @@ abstract class LogFile {
     }
 
     long getUsableSpace() {
-      return file.getUsableSpace();
+      return usableSpace.getUsableSpace();
     }
 
     long getMaxSize() {
@@ -205,6 +238,7 @@ abstract class LogFile {
       Preconditions.checkState(offset >= 0, String.valueOf(offset));
       // OP_RECORD + size + buffer
       int recordLength = 1 + (int)Serialization.SIZE_OF_INT + buffer.limit();
+      usableSpace.decrement(recordLength);
       preallocate(recordLength);
       ByteBuffer toWrite = ByteBuffer.allocate(recordLength);
       toWrite.put(OP_RECORD);

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
index 1fe219a..9c98d8c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileFactory.java
@@ -67,13 +67,14 @@ class LogFileFactory {
   static LogFile.Writer getWriter(File file, int logFileID,
       long maxFileSize, @Nullable Key encryptionKey,
       @Nullable String encryptionKeyAlias,
-      @Nullable String encryptionCipherProvider) throws IOException {
+      @Nullable String encryptionCipherProvider,
+      long usableSpaceRefreshInterval) throws IOException {
     Preconditions.checkState(!file.exists(), "File already exists "  +
       file.getAbsolutePath());
     Preconditions.checkState(file.createNewFile(), "File could not be created "
         + file.getAbsolutePath());
     return new LogFileV3.Writer(file, logFileID, maxFileSize, encryptionKey,
-        encryptionKeyAlias, encryptionCipherProvider);
+        encryptionKeyAlias, encryptionCipherProvider, usableSpaceRefreshInterval);
   }
 
   static LogFile.RandomReader getRandomReader(File file,

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
index 4c593a4..f286c57 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV2.java
@@ -93,9 +93,10 @@ class LogFileV2 extends LogFile {
 
   static class Writer extends LogFile.Writer {
 
-    Writer(File file, int logFileID, long maxFileSize)
+    Writer(File file, int logFileID, long maxFileSize,
+        long usableSpaceRefreshInterval)
         throws IOException {
-      super(file, logFileID, maxFileSize, null);
+      super(file, logFileID, maxFileSize, null, usableSpaceRefreshInterval);
       RandomAccessFile writeFileHandle = getFileHandle();
       writeFileHandle.writeInt(getVersion());
       writeFileHandle.writeInt(logFileID);

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index aac7805..f51935c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -169,10 +169,12 @@ class LogFileV3 extends LogFile {
     Writer(File file, int logFileID, long maxFileSize,
         @Nullable Key encryptionKey,
         @Nullable String encryptionKeyAlias,
-        @Nullable String encryptionCipherProvider)
+        @Nullable String encryptionCipherProvider,
+        long usableSpaceRefreshInterval)
         throws IOException {
       super(file, logFileID, maxFileSize, CipherProviderFactory.
-          getEncrypter(encryptionCipherProvider, encryptionKey));
+          getEncrypter(encryptionCipherProvider, encryptionKey),
+          usableSpaceRefreshInterval);
       ProtosFactory.LogFileMetaData.Builder metaDataBuilder =
           ProtosFactory.LogFileMetaData.newBuilder();
       if(encryptionKey != null) {

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 3d5bf59..ea57cdb 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -311,6 +311,7 @@ public class TestFileChannelRestart extends TestFileChannelBase {
     File inflight = new File(checkpointDir, name);
     RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
     writer.write(new Random().nextInt());
+    writer.close();
     channel = createFileChannel(overrides);
     channel.start();
     Assert.assertTrue(channel.isOpen());

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index f9dbba5..6751714 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel.file;
 
+import static org.mockito.Mockito.*;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -184,7 +185,7 @@ public class TestLog {
   }
   public void doTestMinimumRequiredSpaceTooSmallForPut() throws IOException,
     InterruptedException {
-    long minimumRequireSpace = checkpointDir.getUsableSpace() -
+    long minimumRequiredSpace = checkpointDir.getUsableSpace() -
         (10L* 1024L * 1024L);
     log.close();
     log = new Log.Builder().setCheckpointInterval(
@@ -192,12 +193,13 @@ public class TestLog {
             FileChannelConfiguration.DEFAULT_MAX_FILE_SIZE).setQueueSize(
             CAPACITY).setCheckpointDir(checkpointDir).setLogDirs(
                 dataDirs).setChannelName("testlog").
-                setMinimumRequiredSpace(minimumRequireSpace).build();
+                setMinimumRequiredSpace(minimumRequiredSpace)
+                .setUsableSpaceRefreshInterval(1L).build();
     log.replay();
     File filler = new File(checkpointDir, "filler");
     byte[] buffer = new byte[64 * 1024];
     FileOutputStream out = new FileOutputStream(filler);
-    while(checkpointDir.getUsableSpace() > minimumRequireSpace) {
+    while(checkpointDir.getUsableSpace() > minimumRequiredSpace) {
       out.write(buffer);
     }
     out.close();
@@ -436,6 +438,28 @@ public class TestLog {
     Assert.assertEquals(eventIn.getHeaders(), eventOut.getHeaders());
     Assert.assertArrayEquals(eventIn.getBody(), eventOut.getBody());
   }
+  @Test
+  public void testCachedFSUsableSpace() throws Exception {
+    File fs = mock(File.class);
+    when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE);
+    LogFile.CachedFSUsableSpace cachedFS =
+        new LogFile.CachedFSUsableSpace(fs, 1000L);
+    Assert.assertEquals(cachedFS.getUsableSpace(), Long.MAX_VALUE);
+    cachedFS.decrement(Integer.MAX_VALUE);
+    Assert.assertEquals(cachedFS.getUsableSpace(),
+        Long.MAX_VALUE - Integer.MAX_VALUE);
+    try {
+      cachedFS.decrement(-1);
+      Assert.fail();
+    } catch (IllegalArgumentException expected) {
+
+    }
+    when(fs.getUsableSpace()).thenReturn(Long.MAX_VALUE - 1L);
+    Thread.sleep(1100);
+    Assert.assertEquals(cachedFS.getUsableSpace(),
+        Long.MAX_VALUE - 1L);
+  }
+
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
       FlumeEvent eventIn) throws IOException, InterruptedException {
     FlumeEventQueue queue = log.getFlumeEventQueue();

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
index 9e28599..bef22ef 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLogFile.java
@@ -55,7 +55,7 @@ public class TestLogFile {
     dataFile = new File(dataDir, String.valueOf(fileID));
     Assert.assertTrue(dataDir.isDirectory());
     logFileWriter = LogFileFactory.getWriter(dataFile, fileID,
-        Integer.MAX_VALUE, null, null, null);
+        Integer.MAX_VALUE, null, null, null, Long.MAX_VALUE);
   }
   @After
   public void cleanup() throws IOException {
@@ -72,7 +72,7 @@ public class TestLogFile {
     Assert.assertTrue(dataFile.isFile() || dataFile.createNewFile());
     try {
       LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
-          null);
+          null, Long.MAX_VALUE);
       Assert.fail();
     } catch (IllegalStateException e) {
       Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),
@@ -86,7 +86,7 @@ public class TestLogFile {
     Assert.assertTrue(dataFile.mkdirs());
     try {
       LogFileFactory.getWriter(dataFile, fileID, Integer.MAX_VALUE, null, null,
-          null);
+          null, Long.MAX_VALUE);
       Assert.fail();
     } catch (IllegalStateException e) {
       Assert.assertEquals("File already exists " + dataFile.getAbsolutePath(),

http://git-wip-us.apache.org/repos/asf/flume/blob/112e80a2/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
index 274ee7b..f403422 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestTransactionEventRecordV3.java
@@ -20,9 +20,7 @@ package org.apache.flume.channel.file;
 
 import static org.mockito.Mockito.*;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;