You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/04/05 20:27:33 UTC
[1/2] FLUME-1516: FileChannel Write Dual Checkpoints to avoid replays
Updated Branches:
refs/heads/trunk df7a197a5 -> 6ca616800
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 170dc72..fb0e208 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -18,12 +18,13 @@
*/
package org.apache.flume.channel.file;
-import static org.apache.flume.channel.file.TestUtils.*;
-
-import java.io.File;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.fest.reflect.exception.ReflectionError;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -31,12 +32,23 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
+import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.Map;
import java.util.Random;
-import org.apache.flume.channel.file.proto.ProtosFactory;
+import java.util.Set;
+
+import static org.apache.flume.channel.file.TestUtils.compareInputAndOut;
+import static org.apache.flume.channel.file.TestUtils.consumeChannel;
+import static org.apache.flume.channel.file.TestUtils.fillChannel;
+import static org.apache.flume.channel.file.TestUtils.forceCheckpoint;
+import static org.apache.flume.channel.file.TestUtils.putEvents;
+import static org.apache.flume.channel.file.TestUtils.takeEvents;
+import static org.fest.reflect.core.Reflection.*;
public class TestFileChannelRestart extends TestFileChannelBase {
protected static final Logger LOG = LoggerFactory
@@ -119,16 +131,32 @@ public class TestFileChannelRestart extends TestFileChannelBase {
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+
+ @Test
+ public void testRestartWhenMetaDataExistsButCheckpointDoesNot() throws
+ Exception {
+ doTestRestartWhenMetaDataExistsButCheckpointDoesNot(false);
+ }
+
@Test
- public void testRestartWhenMetaDataExistsButCheckpointDoesNot()
+ public void testRestartWhenMetaDataExistsButCheckpointDoesNotWithBackup()
throws Exception {
+ doTestRestartWhenMetaDataExistsButCheckpointDoesNot(true);
+ }
+
+ private void doTestRestartWhenMetaDataExistsButCheckpointDoesNot(
+ boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
Assert.assertTrue(checkpoint.delete());
@@ -139,19 +167,36 @@ public class TestFileChannelRestart extends TestFileChannelBase {
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(checkpointMetaData.exists());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+
+ @Test
+ public void testRestartWhenCheckpointExistsButMetaDoesNot() throws Exception{
+ doTestRestartWhenCheckpointExistsButMetaDoesNot(false);
+ }
+
@Test
- public void testRestartWhenCheckpointExistsButMetaDoesNot()
+ public void testRestartWhenCheckpointExistsButMetaDoesNotWithBackup() throws
+ Exception{
+ doTestRestartWhenCheckpointExistsButMetaDoesNot(true);
+ }
+
+
+ private void doTestRestartWhenCheckpointExistsButMetaDoesNot(boolean backup)
throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -162,19 +207,34 @@ public class TestFileChannelRestart extends TestFileChannelBase {
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(checkpointMetaData.exists());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testRestartWhenNoCheckpointExists() throws Exception {
+ doTestRestartWhenNoCheckpointExists(false);
+ }
+
+ @Test
+ public void testRestartWhenNoCheckpointExistsWithBackup() throws Exception {
+ doTestRestartWhenNoCheckpointExists(true);
+ }
+
+ private void doTestRestartWhenNoCheckpointExists(boolean backup) throws
+ Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
File checkpointMetaData = Serialization.getMetaDataFile(checkpoint);
@@ -185,19 +245,33 @@ public class TestFileChannelRestart extends TestFileChannelBase {
Assert.assertTrue(channel.isOpen());
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(checkpointMetaData.exists());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
- public void testBadCheckpointVersion() throws Exception{
+ public void testBadCheckpointVersion() throws Exception {
+ doTestBadCheckpointVersion(false);
+ }
+
+ @Test
+ public void testBadCheckpointVersionWithBackup() throws Exception {
+ doTestBadCheckpointVersion(true);
+ }
+
+ private void doTestBadCheckpointVersion(boolean backup) throws Exception{
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
@@ -209,19 +283,34 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testBadCheckpointMetaVersion() throws Exception {
+ doTestBadCheckpointMetaVersion(false);
+ }
+
+ @Test
+ public void testBadCheckpointMetaVersionWithBackup() throws Exception {
+ doTestBadCheckpointMetaVersion(true);
+ }
+
+ private void doTestBadCheckpointMetaVersion(boolean backup) throws
+ Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
@@ -235,19 +324,35 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testDifferingOrderIDCheckpointAndMetaVersion() throws Exception {
+ doTestDifferingOrderIDCheckpointAndMetaVersion(false);
+ }
+
+ @Test
+ public void testDifferingOrderIDCheckpointAndMetaVersionWithBackup() throws
+ Exception {
+ doTestDifferingOrderIDCheckpointAndMetaVersion(true);
+ }
+
+ private void doTestDifferingOrderIDCheckpointAndMetaVersion(boolean backup)
+ throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
FileInputStream is = new FileInputStream(Serialization.getMetaDataFile(checkpoint));
@@ -261,19 +366,33 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
- public void testIncompleteCheckpoint() throws Exception {
+ public void testIncompleteCheckpoint() throws Exception{
+ doTestIncompleteCheckpoint(false);
+ }
+
+ @Test
+ public void testIncompleteCheckpointWithCheckpoint() throws Exception{
+ doTestIncompleteCheckpoint(true);
+ }
+
+ private void doTestIncompleteCheckpoint(boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
@@ -285,18 +404,29 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testCorruptInflightPuts() throws Exception {
- testCorruptInflights("inflightPuts");
+ doTestCorruptInflights("inflightPuts", false);
+ }
+
+ @Test
+ public void testCorruptInflightPutsWithBackup() throws Exception {
+ doTestCorruptInflights("inflightPuts", true);
}
@Test
public void testCorruptInflightTakes() throws Exception {
- testCorruptInflights("inflightTakes");
+ doTestCorruptInflights("inflightTakes", false);
+ }
+
+ @Test
+ public void testCorruptInflightTakesWithBackup() throws Exception {
+ doTestCorruptInflights("inflightTakes", true);
}
@Test
@@ -352,14 +482,19 @@ public class TestFileChannelRestart extends TestFileChannelBase {
compareInputAndOut(in, out);
}
- private void testCorruptInflights(String name) throws Exception {
+ private void doTestCorruptInflights(String name,
+ boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File inflight = new File(checkpointDir, name);
RandomAccessFile writer = new RandomAccessFile(inflight, "rw");
@@ -368,19 +503,33 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testTruncatedCheckpointMeta() throws Exception {
+ doTestTruncatedCheckpointMeta(false);
+ }
+
+ @Test
+ public void testTruncatedCheckpointMetaWithBackup() throws Exception {
+ doTestTruncatedCheckpointMeta(true);
+ }
+
+ private void doTestTruncatedCheckpointMeta(boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
RandomAccessFile writer = new RandomAccessFile(
@@ -391,19 +540,33 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
@Test
public void testCorruptCheckpointMeta() throws Exception {
+ doTestCorruptCheckpointMeta(false);
+ }
+
+ @Test
+ public void testCorruptCheckpointMetaWithBackup() throws Exception {
+ doTestCorruptCheckpointMeta(true);
+ }
+
+ private void doTestCorruptCheckpointMeta(boolean backup) throws Exception {
Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, String.valueOf(backup));
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
Set<String> in = putEvents(channel, "restart", 10, 100);
Assert.assertEquals(100, in.size());
forceCheckpoint(channel);
+ if(backup) {
+ Thread.sleep(2000);
+ }
channel.stop();
File checkpoint = new File(checkpointDir, "checkpoint");
RandomAccessFile writer = new RandomAccessFile(
@@ -415,10 +578,19 @@ public class TestFileChannelRestart extends TestFileChannelBase {
channel = createFileChannel(overrides);
channel.start();
Assert.assertTrue(channel.isOpen());
+ Assert.assertTrue(!backup || channel.checkpointBackupRestored());
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+ private void checkIfBackupUsed(boolean backup) {
+ boolean backupRestored = channel.checkpointBackupRestored();
+ if (backup) {
+ Assert.assertTrue(backupRestored);
+ } else {
+ Assert.assertFalse(backupRestored);
+ }
+ }
@Test
public void testWithExtraLogs()
@@ -445,4 +617,158 @@ public class TestFileChannelRestart extends TestFileChannelBase {
Set<String> out = consumeChannel(channel);
compareInputAndOut(in, out);
}
+
+ // Make sure the entire channel was not replayed, only the events from the
+ // backup.
+ @Test
+ public void testBackupUsedEnsureNoFullReplay() throws Exception {
+ File dataDir = Files.createTempDir();
+ File tempBackup = Files.createTempDir();
+ Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.DATA_DIRS,
+ dataDir.getAbsolutePath());
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+ channel = createFileChannel(overrides);
+ channel.start();
+ Assert.assertTrue(channel.isOpen());
+ Set<String> in = putEvents(channel, "restart", 10, 100);
+ Assert.assertEquals(100, in.size());
+ forceCheckpoint(channel);
+ Thread.sleep(2000);
+ in = putEvents(channel, "restart", 10, 100);
+ takeEvents(channel, 10, 100);
+ Assert.assertEquals(100, in.size());
+ for(File file : backupDir.listFiles()) {
+ if(file.getName().equals(Log.FILE_LOCK)) {
+ continue;
+ }
+ Files.copy(file, new File(tempBackup, file.getName()));
+ }
+ forceCheckpoint(channel);
+ channel.stop();
+
+ Serialization.deleteAllFiles(checkpointDir, Log.EXCLUDES);
+ // The last checkpoint may have been already backed up (it did while I
+ // was running this test, since the checkpoint itself is tiny in unit
+ // tests), so throw away the backup and force the use of an older backup by
+ // bringing in the copy of the last backup before the checkpoint.
+ Serialization.deleteAllFiles(backupDir, Log.EXCLUDES);
+ for(File file : tempBackup.listFiles()) {
+ if(file.getName().equals(Log.FILE_LOCK)) {
+ continue;
+ }
+ Files.copy(file, new File(backupDir, file.getName()));
+ }
+ channel = createFileChannel(overrides);
+ channel.start();
+ Assert.assertTrue(channel.isOpen());
+ checkIfBackupUsed(true);
+ Assert.assertEquals(100, channel.getLog().getPutCount());
+ Assert.assertEquals(20, channel.getLog().getCommittedCount());
+ Assert.assertEquals(100, channel.getLog().getTakeCount());
+ Assert.assertEquals(0, channel.getLog().getRollbackCount());
+ //Read Count = 100 puts + 10 commits + 100 takes + 10 commits
+ Assert.assertEquals(220, channel.getLog().getReadCount());
+ consumeChannel(channel);
+ FileUtils.deleteQuietly(dataDir);
+ FileUtils.deleteQuietly(tempBackup);
+ }
+
+ //Make sure data files required by the backup checkpoint are not deleted.
+ @Test
+ public void testDataFilesRequiredByBackupNotDeleted() throws Exception {
+ Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+ overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
+ channel = createFileChannel(overrides);
+ channel.start();
+ String prefix = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
+ Assert.assertTrue(channel.isOpen());
+ putEvents(channel, prefix, 10, 100);
+ Set<String> origFiles = Sets.newHashSet();
+ for(File dir : dataDirs) {
+ origFiles.addAll(Lists.newArrayList(dir.list()));
+ }
+ forceCheckpoint(channel);
+ takeEvents(channel, 10, 50);
+ long beforeSecondCheckpoint = System.currentTimeMillis();
+ forceCheckpoint(channel);
+ Set<String> newFiles = Sets.newHashSet();
+ int olderThanCheckpoint = 0;
+ int totalMetaFiles = 0;
+ for(File dir : dataDirs) {
+ File[] metadataFiles = dir.listFiles(new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ if (name.endsWith(".meta")) {
+ return true;
+ }
+ return false;
+ }
+ });
+ totalMetaFiles = metadataFiles.length;
+ for(File metadataFile : metadataFiles) {
+ if(metadataFile.lastModified() < beforeSecondCheckpoint) {
+ olderThanCheckpoint++;
+ }
+ }
+ newFiles.addAll(Lists.newArrayList(dir.list()));
+ }
+ /*
+ * Files which are not required by the new checkpoint should not have been
+ * modified by the checkpoint.
+ */
+ Assert.assertTrue(olderThanCheckpoint > 0);
+ Assert.assertTrue(totalMetaFiles != olderThanCheckpoint);
+
+ /*
+ * All files needed by original checkpoint should still be there.
+ */
+ Assert.assertTrue(newFiles.containsAll(origFiles));
+ takeEvents(channel, 10, 50);
+ forceCheckpoint(channel);
+ newFiles = Sets.newHashSet();
+ for(File dir : dataDirs) {
+ newFiles.addAll(Lists.newArrayList(dir.list()));
+ }
+ Assert.assertTrue(!newFiles.containsAll(origFiles));
+ }
+
+ @Test (expected = IOException.class)
+ public void testSlowBackup() throws Throwable {
+ Map<String, String> overrides = Maps.newHashMap();
+ overrides.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+ overrides.put(FileChannelConfiguration.MAX_FILE_SIZE, "1000");
+ channel = createFileChannel(overrides);
+ channel.start();
+ Assert.assertTrue(channel.isOpen());
+ Set<String> in = putEvents(channel, "restart", 10, 100);
+ Assert.assertEquals(100, in.size());
+ slowdownBackup(channel);
+ forceCheckpoint(channel);
+ in = putEvents(channel, "restart", 10, 100);
+ takeEvents(channel, 10, 100);
+ Assert.assertEquals(100, in.size());
+ try {
+ forceCheckpoint(channel);
+ } catch (ReflectionError ex) {
+ throw ex.getCause();
+ } finally {
+ channel.stop();
+ }
+ }
+
+ private static void slowdownBackup(FileChannel channel) {
+ Log log = field("log").ofType(Log.class).in(channel).get();
+
+ FlumeEventQueue queue = field("queue")
+ .ofType(FlumeEventQueue.class)
+ .in(log).get();
+
+ EventQueueBackingStore backingStore = field("backingStore")
+ .ofType(EventQueueBackingStore.class)
+ .in(queue).get();
+
+ field("slowdownBackup").ofType(Boolean.class).in(backingStore).set(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
index ba653e6..7c490b5 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestUtils.java
@@ -237,7 +237,7 @@ public class TestUtils {
} catch (Exception ex) {
transaction.rollback();
if(untilCapacityIsReached && ex instanceof ChannelException &&
- ("The channel has reached it's capacity. "
+ ("The channel has reached it's capacity. "
+ "This might be the result of a sink on the channel having too "
+ "low of batch size, a downstream system running slower than "
+ "normal, or that the channel capacity is just too low. "
@@ -260,10 +260,13 @@ public class TestUtils {
}
public static Context createFileChannelContext(String checkpointDir,
- String dataDir, Map<String, String> overrides) {
+ String dataDir, String backupDir, Map<String, String> overrides) {
Context context = new Context();
context.put(FileChannelConfiguration.CHECKPOINT_DIR,
checkpointDir);
+ if(backupDir != null) {
+ context.put(FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, backupDir);
+ }
context.put(FileChannelConfiguration.DATA_DIRS, dataDir);
context.put(FileChannelConfiguration.KEEP_ALIVE, String.valueOf(1));
context.put(FileChannelConfiguration.CAPACITY, String.valueOf(10000));
@@ -273,10 +276,16 @@ public class TestUtils {
return context;
}
public static FileChannel createFileChannel(String checkpointDir,
- String dataDir, Map<String, String> overrides) {
+ String dataDir, Map<String, String> overrides) {
+ return createFileChannel(checkpointDir, dataDir, null, overrides);
+ }
+
+ public static FileChannel createFileChannel(String checkpointDir,
+ String dataDir, String backupDir, Map<String, String> overrides) {
FileChannel channel = new FileChannel();
channel.setName("FileChannel-" + UUID.randomUUID());
- Context context = createFileChannelContext(checkpointDir, dataDir, overrides);
+ Context context = createFileChannelContext(checkpointDir, dataDir,
+ backupDir, overrides);
Configurables.configure(channel, context);
return channel;
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 16fba45..693c0d7 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1930,6 +1930,8 @@ Property Name Default Description
================================================ ================================ ========================================================
**type** -- The component type name, needs to be ``file``.
checkpointDir ~/.flume/file-channel/checkpoint The directory where checkpoint file will be stored
+useDualCheckpoints false Backup the checkpoint. If this is set to ``true``, ``backupCheckpointDir`` **must** be set
+backupCheckpointDir -- The directory where the checkpoint is backed up to. This directory **must not** be the same as the data directories or the checkpoint directory
dataDirs ~/.flume/file-channel/data The directory where log files will be stored
transactionCapacity 1000 The maximum size of transaction supported by the channel
checkpointInterval 30000 Amount of time (in millis) between checkpoints
[2/2] git commit: FLUME-1516: FileChannel Write Dual Checkpoints to
avoid replays
Posted by br...@apache.org.
FLUME-1516: FileChannel Write Dual Checkpoints to avoid replays
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6ca61680
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6ca61680
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6ca61680
Branch: refs/heads/trunk
Commit: 6ca616800ec897551fbb14959ce3a5f0c1d69aed
Parents: df7a197
Author: Brock Noland <br...@apache.org>
Authored: Fri Apr 5 13:26:56 2013 -0500
Committer: Brock Noland <br...@apache.org>
Committed: Fri Apr 5 13:26:56 2013 -0500
----------------------------------------------------------------------
.../flume/channel/file/EventQueueBackingStore.java | 2 +
.../file/EventQueueBackingStoreFactory.java | 21 +-
.../channel/file/EventQueueBackingStoreFile.java | 156 +++++++-
.../channel/file/EventQueueBackingStoreFileV3.java | 17 +-
.../org/apache/flume/channel/file/FileChannel.java | 117 +++---
.../channel/file/FileChannelConfiguration.java | 10 +
.../apache/flume/channel/file/FlumeEventQueue.java | 18 -
.../java/org/apache/flume/channel/file/Log.java | 183 ++++++--
.../org/apache/flume/channel/file/LogFile.java | 60 ++-
.../org/apache/flume/channel/file/LogFileV3.java | 41 +-
.../apache/flume/channel/file/ReplayHandler.java | 69 ++--
.../apache/flume/channel/file/Serialization.java | 105 ++++-
.../flume/channel/file/proto/ProtosFactory.java | 146 ++++++-
.../src/main/proto/filechannel.proto | 2 +
.../flume/channel/file/TestFileChannelBase.java | 7 +-
.../flume/channel/file/TestFileChannelRestart.java | 356 ++++++++++++++-
.../org/apache/flume/channel/file/TestUtils.java | 17 +-
flume-ng-doc/sphinx/FlumeUserGuide.rst | 2 +
18 files changed, 1104 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
index b136eb0..2726095 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
@@ -29,6 +29,8 @@ abstract class EventQueueBackingStore {
private long logWriteOrderID;
private final int capacity;
private final String name;
+ public static final String BACKUP_COMPLETE_FILENAME = "backupComplete";
+ protected Boolean slowdownBackup = false;
protected EventQueueBackingStore(int capacity, String name) {
this.capacity = capacity;
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index a19bdb4..07a3781 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -35,8 +35,14 @@ class EventQueueBackingStoreFactory {
String name) throws Exception {
return get(checkpointFile, capacity, name, true);
}
+
static EventQueueBackingStore get(File checkpointFile, int capacity,
String name, boolean upgrade) throws Exception {
+ return get(checkpointFile, null, capacity, name, upgrade, false);
+ }
+ static EventQueueBackingStore get(File checkpointFile,
+ File backupCheckpointDir, int capacity,String name,
+ boolean upgrade, boolean shouldBackup) throws Exception {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
RandomAccessFile checkpointFileHandle = null;
try {
@@ -61,17 +67,20 @@ class EventQueueBackingStoreFactory {
if(!checkpointFile.createNewFile()) {
throw new IOException("Cannot create " + checkpointFile);
}
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV3(checkpointFile,
+ capacity, name, backupCheckpointDir, shouldBackup);
}
// v3 due to meta file, version will be checked by backing store
if(metaDataExists) {
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
+ name, backupCheckpointDir, shouldBackup);
}
checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
int version = (int)checkpointFileHandle.readLong();
if(Serialization.VERSION_2 == version) {
if(upgrade) {
- return upgrade(checkpointFile, capacity, name);
+ return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
+ shouldBackup);
}
return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
}
@@ -91,7 +100,8 @@ class EventQueueBackingStoreFactory {
}
private static EventQueueBackingStore upgrade(File checkpointFile,
- int capacity, String name)
+ int capacity, String name, File backupCheckpointDir,
+ boolean shouldBackup)
throws Exception {
LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
EventQueueBackingStoreFileV2 backingStoreV2 =
@@ -103,7 +113,8 @@ class EventQueueBackingStoreFactory {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
metaDataFile);
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
+ backupCheckpointDir, shouldBackup);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 4115505..5884800 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -28,8 +28,14 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,11 +62,25 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
protected final MappedByteBuffer mappedBuffer;
protected final RandomAccessFile checkpointFileHandle;
protected final File checkpointFile;
+ private final Semaphore backupCompletedSema = new Semaphore(1);
+ protected final boolean shouldBackup;
+ private final File backupDir;
+ private final ExecutorService checkpointBackUpExecutor;
protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile) throws IOException, BadCheckpointException {
+ File checkpointFile) throws IOException,
+ BadCheckpointException {
+ this(capacity, name, checkpointFile, null, false);
+ }
+
+ protected EventQueueBackingStoreFile(int capacity, String name,
+ File checkpointFile, File checkpointBackupDir,
+ boolean backupCheckpoint) throws IOException,
+ BadCheckpointException {
super(capacity, name);
this.checkpointFile = checkpointFile;
+ this.shouldBackup = backupCheckpoint;
+ this.backupDir = checkpointBackupDir;
checkpointFileHandle = new RandomAccessFile(checkpointFile, "rw");
long totalBytes = (capacity + HEADER_SIZE) * Serialization.SIZE_OF_LONG;
if(checkpointFileHandle.length() == 0) {
@@ -95,6 +115,13 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
+ " probably because the agent stopped while the channel was"
+ " checkpointing.");
}
+ if (shouldBackup) {
+ checkpointBackUpExecutor = Executors.newSingleThreadExecutor(
+ new ThreadFactoryBuilder().setNameFormat(
+ getName() + " - CheckpointBackUpThread").build());
+ } else {
+ checkpointBackUpExecutor = null;
+ }
}
protected long getCheckpointLogWriteOrderID() {
@@ -103,11 +130,104 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
protected abstract void writeCheckpointMetaData() throws IOException;
+ /**
+ * This method backs up the checkpoint and its metadata files. This method
+ * is called once the checkpoint is completely written and is called
+ * from a separate thread which runs in the background while the file channel
+ * continues operation.
+ *
+ * @param backupDirectory - the directory to which the backup files should be
+ * copied.
+ * @throws IOException - if the copy failed, or if there is not enough disk
+ * space to copy the checkpoint files over.
+ */
+ protected void backupCheckpoint(File backupDirectory) throws IOException {
+ int availablePermits = backupCompletedSema.drainPermits();
+ Preconditions.checkState(availablePermits == 0,
+ "Expected no permits to be available in the backup semaphore, " +
+ "but " + availablePermits + " permits were available.");
+ if (slowdownBackup) {
+ try {
+ TimeUnit.SECONDS.sleep(10);
+ } catch (Exception ex) {
+ Throwables.propagate(ex);
+ }
+ }
+ File backupFile = new File(backupDirectory, BACKUP_COMPLETE_FILENAME);
+ if (backupExists(backupDirectory)) {
+ if (!backupFile.delete()) {
+ throw new IOException("Error while doing backup of checkpoint. Could " +
+ "not remove" + backupFile.toString() + ".");
+ }
+ }
+ Serialization.deleteAllFiles(backupDirectory, Log.EXCLUDES);
+ File checkpointDir = checkpointFile.getParentFile();
+ File[] checkpointFiles = checkpointDir.listFiles();
+ Preconditions.checkNotNull(checkpointFiles, "Could not retrieve files " +
+ "from the checkpoint directory. Cannot complete backup of the " +
+ "checkpoint.");
+ for (File origFile : checkpointFiles) {
+ if(origFile.getName().equals(Log.FILE_LOCK)) {
+ continue;
+ }
+ Serialization.copyFile(origFile, new File(backupDirectory,
+ origFile.getName()));
+ }
+ Preconditions.checkState(!backupFile.exists(), "The backup file exists " +
+ "while it is not supposed to. Are multiple channels configured to use " +
+ "this directory: " + backupDirectory.toString() + " as backup?");
+ if (!backupFile.createNewFile()) {
+ LOG.error("Could not create backup file. Backup of checkpoint will " +
+ "not be used during replay even if checkpoint is bad.");
+ }
+ }
+
+ /**
+ * Restore the checkpoint, if it is found to be bad.
+ * @return true - if the previous backup was successfully completed and
+ * restore was successfully completed.
+ * @throws IOException - If restore failed due to IOException
+ *
+ */
+ public static boolean restoreBackup(File checkpointDir, File backupDir)
+ throws IOException {
+ if (!backupExists(backupDir)) {
+ return false;
+ }
+ Serialization.deleteAllFiles(checkpointDir, Log.EXCLUDES);
+ File[] backupFiles = backupDir.listFiles();
+ if (backupFiles == null) {
+ return false;
+ } else {
+ for (File backupFile : backupFiles) {
+ String fileName = backupFile.getName();
+ if (!fileName.equals(BACKUP_COMPLETE_FILENAME) &&
+ !fileName.equals(Log.FILE_LOCK)) {
+ Serialization.copyFile(backupFile, new File(checkpointDir, fileName));
+ }
+ }
+ return true;
+ }
+ }
+
@Override
void beginCheckpoint() throws IOException {
LOG.info("Start checkpoint for " + checkpointFile +
", elements to sync = " + overwriteMap.size());
+ if (shouldBackup) {
+ int permits = backupCompletedSema.drainPermits();
+ Preconditions.checkState(permits <= 1, "Expected only one or less " +
+ "permits to checkpoint, but got " + String.valueOf(permits) +
+ " permits");
+ if(permits < 1) {
+ // Force the checkpoint to not happen by throwing an exception.
+ throw new IOException("Previous backup of checkpoint files is still " +
+ "in progress. Will attempt to checkpoint only at the end of the " +
+ "next checkpoint interval. Try increasing the checkpoint interval " +
+ "if this error happens often.");
+ }
+ }
// Start checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
mappedBuffer.force();
@@ -141,8 +261,38 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
// Finish checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_COMPLETE);
mappedBuffer.force();
+ if (shouldBackup) {
+ startBackupThread();
+ }
}
+ /**
+ * This method starts backing up the checkpoint in the background.
+ */
+ private void startBackupThread() {
+ Preconditions.checkNotNull(checkpointBackUpExecutor,
+ "Expected the checkpoint backup exector to be non-null, " +
+ "but it is null. Checkpoint will not be backed up.");
+ LOG.info("Attempting to back up checkpoint.");
+ checkpointBackUpExecutor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ boolean error = false;
+ try {
+ backupCheckpoint(backupDir);
+ } catch (Throwable throwable) {
+ error = true;
+ LOG.error("Backing up of checkpoint directory failed.", throwable);
+ } finally {
+ backupCompletedSema.release();
+ }
+ if (!error) {
+ LOG.info("Checkpoint backup completed.");
+ }
+ }
+ });
+ }
@Override
void close() {
@@ -242,6 +392,10 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
}
}
+ public static boolean backupExists(File backupDir) {
+ return new File(backupDir, BACKUP_COMPLETE_FILENAME).exists();
+ }
+
public static void main(String[] args) throws Exception {
File file = new File(args[0]);
File inflightTakesFile = new File(args[1]);
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index 451a9d4..c153558 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -38,9 +38,15 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
.getLogger(EventQueueBackingStoreFileV3.class);
private final File metaDataFile;
- EventQueueBackingStoreFileV3(File checkpointFile, int capacity, String name)
- throws IOException, BadCheckpointException {
- super(capacity, name, checkpointFile);
+ EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
+ String name) throws IOException, BadCheckpointException {
+ this(checkpointFile, capacity, name, null, false);
+ }
+
+ EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
+ String name, File checkpointBackupDir,
+ boolean backupCheckpoint) throws IOException, BadCheckpointException {
+ super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
metaDataFile = Serialization.getMetaDataFile(checkpointFile);
@@ -89,6 +95,11 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
}
}
} else {
+ if(backupExists(checkpointBackupDir) && shouldBackup) {
+ // If a backup exists, then throw an exception to recover checkpoint
+ throw new BadCheckpointException("The checkpoint metadata file does " +
+ "not exist, but a backup exists");
+ }
ProtosFactory.Checkpoint.Builder checkpointBuilder =
ProtosFactory.Checkpoint.newBuilder();
checkpointBuilder.setVersion(getVersion());
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
index ff42d19..a7aa70c 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannel.java
@@ -19,22 +19,17 @@
package org.apache.flume.channel.file;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
+import org.apache.flume.annotations.Disposable;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.annotations.Disposable;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.channel.file.Log.Builder;
@@ -45,8 +40,12 @@ import org.apache.flume.instrumentation.ChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
/**
* <p>
@@ -83,6 +82,7 @@ public class FileChannel extends BasicChannelSemantics {
private long maxFileSize;
private long minimumRequiredSpace;
private File checkpointDir;
+ private File backupCheckpointDir;
private File[] dataDirs;
private Log log;
private volatile boolean open;
@@ -99,6 +99,7 @@ public class FileChannel extends BasicChannelSemantics {
private KeyProvider encryptionKeyProvider;
private String encryptionActiveKey;
private String encryptionCipherProvider;
+ private boolean useDualCheckpoints;
@Override
public synchronized void setName(String name) {
@@ -109,60 +110,51 @@ public class FileChannel extends BasicChannelSemantics {
@Override
public void configure(Context context) {
+ useDualCheckpoints = context.getBoolean(
+ FileChannelConfiguration.USE_DUAL_CHECKPOINTS,
+ FileChannelConfiguration.DEFAULT_USE_DUAL_CHECKPOINTS);
String homePath = System.getProperty("user.home").replace('\\', '/');
String strCheckpointDir =
context.getString(FileChannelConfiguration.CHECKPOINT_DIR,
homePath + "/.flume/file-channel/checkpoint");
+ String strBackupCheckpointDir = context.getString
+ (FileChannelConfiguration.BACKUP_CHECKPOINT_DIR, "").trim();
+
String[] strDataDirs = context.getString(FileChannelConfiguration.DATA_DIRS,
homePath + "/.flume/file-channel/data").split(",");
- if(checkpointDir == null) {
- checkpointDir = new File(strCheckpointDir);
- } else if(!checkpointDir.getAbsolutePath().
- equals(new File(strCheckpointDir).getAbsolutePath())) {
- LOG.warn("An attempt was made to change the checkpoint " +
- "directory after start, this is not supported.");
+ checkpointDir = new File(strCheckpointDir);
+
+ if (useDualCheckpoints) {
+ Preconditions.checkState(!strBackupCheckpointDir.isEmpty(),
+ "Dual checkpointing is enabled, but the backup directory is not set. " +
+ "Please set " + FileChannelConfiguration.BACKUP_CHECKPOINT_DIR + " " +
+ "to enable dual checkpointing");
+ backupCheckpointDir = new File(strBackupCheckpointDir);
+ /*
+ * If the backup directory is the same as the checkpoint directory,
+ * then throw an exception and force the config system to ignore this
+ * channel.
+ */
+ Preconditions.checkState(!backupCheckpointDir.equals(checkpointDir),
+ "Could not configure " + getName() + ". The checkpoint backup " +
+ "directory and the checkpoint directory are " +
+ "configured to be the same.");
}
- if(dataDirs == null) {
- dataDirs = new File[strDataDirs.length];
- for (int i = 0; i < strDataDirs.length; i++) {
- dataDirs[i] = new File(strDataDirs[i]);
- }
- } else {
- boolean changed = false;
- if(dataDirs.length != strDataDirs.length) {
- changed = true;
- } else {
- for (int i = 0; i < strDataDirs.length; i++) {
- if(!dataDirs[i].getAbsolutePath().
- equals(new File(strDataDirs[i]).getAbsolutePath())) {
- changed = true;
- break;
- }
- }
- }
- if(changed) {
- LOG.warn("An attempt was made to change the data " +
- "directories after start, this is not supported.");
- }
+
+ dataDirs = new File[strDataDirs.length];
+ for (int i = 0; i < strDataDirs.length; i++) {
+ dataDirs[i] = new File(strDataDirs[i]);
}
- int newCapacity = context.getInteger(FileChannelConfiguration.CAPACITY,
+ capacity = context.getInteger(FileChannelConfiguration.CAPACITY,
FileChannelConfiguration.DEFAULT_CAPACITY);
- if(newCapacity <= 0 && capacity == 0) {
- newCapacity = FileChannelConfiguration.DEFAULT_CAPACITY;
+ if(capacity <= 0) {
+ capacity = FileChannelConfiguration.DEFAULT_CAPACITY;
LOG.warn("Invalid capacity specified, initializing channel to "
- + "default capacity of {}", newCapacity);
- }
- if(capacity > 0 && newCapacity != capacity) {
- LOG.warn("Capacity of this channel cannot be sized on the fly due " +
- "the requirement we have enough DirectMemory for the queue and " +
- "downsizing of the queue cannot be guranteed due to the " +
- "fact there maybe more items on the queue than the new capacity.");
- } else {
- capacity = newCapacity;
+ + "default capacity of {}", capacity);
}
keepAlive =
@@ -181,8 +173,8 @@ public class FileChannel extends BasicChannelSemantics {
}
Preconditions.checkState(transactionCapacity <= capacity,
- "File Channel transaction capacity cannot be greater than the " +
- "capacity of the channel.");
+ "File Channel transaction capacity cannot be greater than the " +
+ "capacity of the channel.");
checkpointInterval =
context.getLong(FileChannelConfiguration.CHECKPOINT_INTERVAL,
@@ -303,6 +295,8 @@ public class FileChannel extends BasicChannelSemantics {
builder.setEncryptionKeyProvider(encryptionKeyProvider);
builder.setEncryptionKeyAlias(encryptionActiveKey);
builder.setEncryptionCipherProvider(encryptionCipherProvider);
+ builder.setUseDualCheckpoints(useDualCheckpoints);
+ builder.setBackupCheckpointDir(backupCheckpointDir);
log = builder.build();
log.replay();
open = true;
@@ -402,6 +396,23 @@ public class FileChannel extends BasicChannelSemantics {
}
/**
+ * Did this channel recover a backup of the checkpoint to restart?
+ * @return true if the channel recovered using a backup.
+ */
+ @VisibleForTesting
+ boolean checkpointBackupRestored() {
+ if(log != null) {
+ return log.backupRestored();
+ }
+ return false;
+ }
+
+ @VisibleForTesting
+ Log getLog() {
+ return log;
+ }
+
+ /**
* Transaction backed by a file. This transaction supports either puts
* or takes but not both.
*/
@@ -462,7 +473,7 @@ public class FileChannel extends BasicChannelSemantics {
}
FlumeEventPointer ptr = log.put(transactionID, event);
Preconditions.checkState(putList.offer(ptr), "putList offer failed "
- + channelNameDescriptor);
+ + channelNameDescriptor);
queue.addWithoutCommit(ptr, transactionID);
success = true;
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
index 24368b3..c2dcffc 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FileChannelConfiguration.java
@@ -23,6 +23,12 @@ public class FileChannelConfiguration {
* Directory Checkpoints will be written in
*/
public static final String CHECKPOINT_DIR = "checkpointDir";
+
+ /**
+ * The directory to which the checkpoint must be backed up
+ */
+ public static final String BACKUP_CHECKPOINT_DIR = "backupCheckpointDir";
+
/**
* Directories data files will be written in. Multiple directories
* can be specified as comma separated values. Writes will
@@ -90,4 +96,8 @@ public class FileChannelConfiguration {
public static final String USE_FAST_REPLAY = "use-fast-replay";
public static final boolean DEFAULT_USE_FAST_REPLAY = false;
+
+ public static final String USE_DUAL_CHECKPOINTS = "useDualCheckpoints";
+ public static final boolean DEFAULT_USE_DUAL_CHECKPOINTS = false;
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 1ed9547..ac03fb4 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -30,12 +30,7 @@ import java.util.Collection;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -394,19 +389,6 @@ final class FlumeEventQueue {
* asynchronously written to disk.
*/
public void serializeAndWrite() throws Exception {
- //Check if there is a current write happening, if there is abort it.
- if (future != null) {
- try {
- future.cancel(true);
- } catch (Exception e) {
- LOG.warn("Interrupted a write to inflights "
- + "file: " + inflightEventsFile.getName()
- + " to start a new write.");
- }
- while (!future.isDone()) {
- TimeUnit.MILLISECONDS.sleep(100);
- }
- }
Collection<Long> values = inflightEvents.values();
if(!fileChannel.isOpen()){
file = new RandomAccessFile(inflightEventsFile, "rw");
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 6ffc824..e61437d 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -18,6 +18,23 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.io.FileUtils;
+import org.apache.flume.ChannelException;
+import org.apache.flume.Event;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.annotations.InterfaceStability;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -31,6 +48,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Executors;
@@ -42,23 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-import javax.annotation.Nullable;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flume.ChannelException;
-import org.apache.flume.Event;
-import org.apache.flume.annotations.InterfaceAudience;
-import org.apache.flume.annotations.InterfaceStability;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
/**
* Stores FlumeEvents on disk and pointers to the events in a in memory queue.
* Once a log object is created the replay method should be called to reconcile
@@ -76,12 +77,13 @@ class Log {
public static final String PREFIX = "log-";
private static final Logger LOGGER = LoggerFactory.getLogger(Log.class);
private static final int MIN_NUM_LOGS = 2;
- private static final String FILE_LOCK = "in_use.lock";
+ public static final String FILE_LOCK = "in_use.lock";
// for reader
private final Map<Integer, LogFile.RandomReader> idLogFileMap = Collections
.synchronizedMap(new HashMap<Integer, LogFile.RandomReader>());
private final AtomicInteger nextFileID = new AtomicInteger(0);
private final File checkpointDir;
+ private final File backupCheckpointDir;
private final File[] logDirs;
private final int queueCapacity;
private final AtomicReferenceArray<LogFile.Writer> logFiles;
@@ -97,6 +99,11 @@ class Log {
private final Map<String, FileLock> locks;
private final ReentrantReadWriteLock checkpointLock =
new ReentrantReadWriteLock(true);
+
+ /**
+ * Set of files that should be excluded from backup and restores.
+ */
+ public static final Set<String> EXCLUDES = Sets.newHashSet(FILE_LOCK);
/**
* Shared lock
*/
@@ -115,6 +122,16 @@ class Log {
private Key encryptionKey;
private final long usableSpaceRefreshInterval;
private boolean didFastReplay = false;
+ private final boolean useDualCheckpoints;
+ private volatile boolean backupRestored = false;
+
+ private int readCount;
+ private int putCount;
+ private int takeCount;
+ private int committedCount;
+ private int rollbackCount;
+
+ private final List<File> pendingDeletes = Lists.newArrayList();
static class Builder {
private long bCheckpointInterval;
@@ -134,6 +151,8 @@ class Log {
private String bEncryptionKeyAlias;
private String bEncryptionCipherProvider;
private long bUsableSpaceRefreshInterval = 15L * 1000L;
+ private boolean bUseDualCheckpoints = false;
+ private File bBackupCheckpointDir = null;
Builder setUsableSpaceRefreshInterval(long usableSpaceRefreshInterval) {
bUsableSpaceRefreshInterval = usableSpaceRefreshInterval;
@@ -210,9 +229,20 @@ class Log {
return this;
}
+ Builder setUseDualCheckpoints(boolean UseDualCheckpoints) {
+ this.bUseDualCheckpoints = UseDualCheckpoints;
+ return this;
+ }
+
+ Builder setBackupCheckpointDir(File backupCheckpointDir) {
+ this.bBackupCheckpointDir = backupCheckpointDir;
+ return this;
+ }
+
Log build() throws IOException {
return new Log(bCheckpointInterval, bMaxFileSize, bQueueCapacity,
- bLogWriteTimeout, bCheckpointWriteTimeout, bCheckpointDir, bName,
+ bLogWriteTimeout, bCheckpointWriteTimeout, bUseDualCheckpoints,
+ bCheckpointDir, bBackupCheckpointDir, bName,
useLogReplayV1, useFastReplay, bMinimumRequiredSpace,
bEncryptionKeyProvider, bEncryptionKeyAlias,
bEncryptionCipherProvider, bUsableSpaceRefreshInterval,
@@ -221,7 +251,8 @@ class Log {
}
private Log(long checkpointInterval, long maxFileSize, int queueCapacity,
- int logWriteTimeout, int checkpointWriteTimeout, File checkpointDir,
+ int logWriteTimeout, int checkpointWriteTimeout,
+ boolean useDualCheckpoints, File checkpointDir, File backupCheckpointDir,
String name, boolean useLogReplayV1, boolean useFastReplay,
long minimumRequiredSpace, @Nullable KeyProvider encryptionKeyProvider,
@Nullable String encryptionKeyAlias,
@@ -229,15 +260,23 @@ class Log {
long usableSpaceRefreshInterval, File... logDirs)
throws IOException {
Preconditions.checkArgument(checkpointInterval > 0,
- "checkpointInterval <= 0");
+ "checkpointInterval <= 0");
Preconditions.checkArgument(queueCapacity > 0, "queueCapacity <= 0");
Preconditions.checkArgument(maxFileSize > 0, "maxFileSize <= 0");
Preconditions.checkNotNull(checkpointDir, "checkpointDir");
Preconditions.checkArgument(usableSpaceRefreshInterval > 0,
"usableSpaceRefreshInterval <= 0");
Preconditions.checkArgument(
- checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
- + checkpointDir + " could not be created");
+ checkpointDir.isDirectory() || checkpointDir.mkdirs(), "CheckpointDir "
+ + checkpointDir + " could not be created");
+ if (useDualCheckpoints) {
+ Preconditions.checkNotNull(backupCheckpointDir, "backupCheckpointDir is" +
+ " null while dual checkpointing is enabled.");
+ Preconditions.checkArgument(
+ backupCheckpointDir.isDirectory() || backupCheckpointDir.mkdirs(),
+ "Backup CheckpointDir " + backupCheckpointDir +
+ " could not be created");
+ }
Preconditions.checkNotNull(logDirs, "logDirs");
Preconditions.checkArgument(logDirs.length > 0, "logDirs empty");
Preconditions.checkArgument(name != null && !name.trim().isEmpty(),
@@ -255,6 +294,9 @@ class Log {
locks = Maps.newHashMap();
try {
lock(checkpointDir);
+ if(useDualCheckpoints) {
+ lock(backupCheckpointDir);
+ }
for (File logDir : logDirs) {
lock(logDir);
}
@@ -288,13 +330,15 @@ class Log {
this.checkpointInterval = Math.max(checkpointInterval, 1000);
this.maxFileSize = maxFileSize;
this.queueCapacity = queueCapacity;
+ this.useDualCheckpoints = useDualCheckpoints;
this.checkpointDir = checkpointDir;
+ this.backupCheckpointDir = backupCheckpointDir;
this.logDirs = logDirs;
this.logWriteTimeout = logWriteTimeout;
this.checkpointWriteTimeout = checkpointWriteTimeout;
logFiles = new AtomicReferenceArray<LogFile.Writer>(this.logDirs.length);
workerExecutor = Executors.newSingleThreadScheduledExecutor(new
- ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
+ ThreadFactoryBuilder().setNameFormat("Log-BackgroundWorker-" + name)
.build());
workerExecutor.scheduleWithFixedDelay(new BackgroundWorker(this),
this.checkpointInterval, this.checkpointInterval,
@@ -365,8 +409,9 @@ class Log {
try {
backingStore =
- EventQueueBackingStoreFactory.get(checkpointFile, queueCapacity,
- channelNameDescriptor);
+ EventQueueBackingStoreFactory.get(checkpointFile,
+ backupCheckpointDir, queueCapacity, channelNameDescriptor,
+ true, this.useDualCheckpoints);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile);
LOGGER.info("Last Checkpoint " + new Date(checkpointFile.lastModified())
@@ -383,14 +428,26 @@ class Log {
*/
doReplay(queue, dataFiles, encryptionKeyProvider, shouldFastReplay);
} catch (BadCheckpointException ex) {
- LOGGER.warn("Checkpoint may not have completed successfully. "
- + "Forcing full replay, this may take a while.", ex);
- if(!Serialization.deleteAllFiles(checkpointDir)) {
- throw new IOException("Could not delete files in checkpoint " +
- "directory to recover from a corrupt or incomplete checkpoint");
+ backupRestored = false;
+ if (useDualCheckpoints) {
+ LOGGER.warn("Checkpoint may not have completed successfully. "
+ + "Restoring checkpoint and starting up.", ex);
+ if (EventQueueBackingStoreFile.backupExists(backupCheckpointDir)) {
+ backupRestored = EventQueueBackingStoreFile.restoreBackup(
+ checkpointDir, backupCheckpointDir);
+ }
+ }
+ if (!backupRestored) {
+ LOGGER.warn("Checkpoint may not have completed successfully. "
+ + "Forcing full replay, this may take a while.", ex);
+ if (!Serialization.deleteAllFiles(checkpointDir, EXCLUDES)) {
+ throw new IOException("Could not delete files in checkpoint " +
+ "directory to recover from a corrupt or incomplete checkpoint");
+ }
}
backingStore = EventQueueBackingStoreFactory.get(checkpointFile,
- queueCapacity, channelNameDescriptor);
+ backupCheckpointDir,
+ queueCapacity, channelNameDescriptor, true, useDualCheckpoints);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile);
// If the checkpoint was deleted due to BadCheckpointException, then
@@ -441,6 +498,11 @@ class Log {
LOGGER.info("Replaying logs with v2 replay logic");
replayHandler.replayLog(dataFiles);
}
+ readCount = replayHandler.getReadCount();
+ putCount = replayHandler.getPutCount();
+ takeCount = replayHandler.getTakeCount();
+ rollbackCount = replayHandler.getRollbackCount();
+ committedCount = replayHandler.getCommitCount();
}
}
@@ -448,6 +510,36 @@ class Log {
boolean didFastReplay() {
return didFastReplay;
}
+ @VisibleForTesting
+ public int getReadCount() {
+ return readCount;
+ }
+ @VisibleForTesting
+ public int getPutCount() {
+ return putCount;
+ }
+
+ @VisibleForTesting
+ public int getTakeCount() {
+ return takeCount;
+ }
+ @VisibleForTesting
+ public int getCommittedCount() {
+ return committedCount;
+ }
+ @VisibleForTesting
+ public int getRollbackCount() {
+ return rollbackCount;
+ }
+
+ /**
+ * Was a checkpoint backup used to replay?
+ * @return true if a checkpoint backup was used to replay.
+ */
+ @VisibleForTesting
+ boolean backupRestored() {
+ return backupRestored;
+ }
int getNextFileID() {
Preconditions.checkState(open, "Log is closed");
@@ -704,6 +796,13 @@ class Log {
} catch (IOException ex) {
LOGGER.warn("Error unlocking " + checkpointDir, ex);
}
+ if (useDualCheckpoints) {
+ try {
+ unlock(backupCheckpointDir);
+ } catch (IOException ex) {
+ LOGGER.warn("Error unlocking " + checkpointDir, ex);
+ }
+ }
for (File logDir : logDirs) {
try {
unlock(logDir);
@@ -942,6 +1041,17 @@ class Log {
private void removeOldLogs(SortedSet<Integer> fileIDs) {
Preconditions.checkState(open, "Log is closed");
+ // To maintain a single code path for deletes, if backup of checkpoint is
+ // enabled or not, we will track the files which can be deleted after the
+ // current checkpoint (since the one which just got backed up still needs
+ // these files) and delete them only after the next (since the current
+ // checkpoint will become the backup at that time,
+ // and thus these files are no longer needed).
+ for(File fileToDelete : pendingDeletes) {
+ LOGGER.info("Removing old file: " + fileToDelete);
+ FileUtils.deleteQuietly(fileToDelete);
+ }
+ pendingDeletes.clear();
// we will find the smallest fileID currently in use and
// won't delete any files with an id larger than the min
int minFileID = fileIDs.first();
@@ -960,14 +1070,9 @@ class Log {
if(reader != null) {
reader.close();
}
- LOGGER.info("Removing old log " + logFile +
- ", result = " + logFile.delete() + ", minFileID "
- + minFileID);
File metaDataFile = Serialization.getMetaDataFile(logFile);
- if(metaDataFile.exists() && !metaDataFile.delete()) {
- LOGGER.warn("Could not remove metadata file "
- + metaDataFile + " for " + logFile);
- }
+ pendingDeletes.add(logFile);
+ pendingDeletes.add(metaDataFile);
}
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
index 1db3717..d3db896 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFile.java
@@ -18,6 +18,17 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.flume.channel.file.encryption.CipherProvider;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.tools.DirectMemoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
@@ -29,19 +40,6 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.encryption.CipherProvider;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.apache.flume.tools.DirectMemoryUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
abstract class LogFile {
private static final Logger LOG = LoggerFactory
@@ -420,6 +418,8 @@ abstract class LogFile {
private int logFileID;
private long lastCheckpointPosition;
private long lastCheckpointWriteOrderID;
+ private long backupCheckpointPosition;
+ private long backupCheckpointWriteOrderID;
/**
* Construct a Sequential Log Reader object
@@ -444,6 +444,14 @@ abstract class LogFile {
protected void setLastCheckpointWriteOrderID(long lastCheckpointWriteOrderID) {
this.lastCheckpointWriteOrderID = lastCheckpointWriteOrderID;
}
+ protected void setPreviousCheckpointPosition(
+ long backupCheckpointPosition) {
+ this.backupCheckpointPosition = backupCheckpointPosition;
+ }
+ protected void setPreviousCheckpointWriteOrderID(
+ long backupCheckpointWriteOrderID) {
+ this.backupCheckpointWriteOrderID = backupCheckpointWriteOrderID;
+ }
protected void setLogFileID(int logFileID) {
this.logFileID = logFileID;
Preconditions.checkArgument(logFileID >= 0, "LogFileID is not positive: "
@@ -459,18 +467,24 @@ abstract class LogFile {
int getLogFileID() {
return logFileID;
}
+
void skipToLastCheckpointPosition(long checkpointWriteOrderID)
- throws IOException {
- if (lastCheckpointPosition > 0L
- && lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
- LOG.info("fast-forward to checkpoint position: "
- + lastCheckpointPosition);
- fileChannel.position(lastCheckpointPosition);
+ throws IOException {
+ if (lastCheckpointPosition > 0L) {
+ long position = 0;
+ if (lastCheckpointWriteOrderID <= checkpointWriteOrderID) {
+ position = lastCheckpointPosition;
+ } else if (backupCheckpointWriteOrderID <= checkpointWriteOrderID
+ && backupCheckpointPosition > 0) {
+ position = backupCheckpointPosition;
+ }
+ fileChannel.position(position);
+ LOG.info("fast-forward to checkpoint position: " + position);
} else {
- LOG.warn("Checkpoint for file(" + file.getAbsolutePath() + ") "
- + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
- + "requested checkpoint time: " + checkpointWriteOrderID
- + " and position " + lastCheckpointPosition);
+ LOG.info("Checkpoint for file(" + file.getAbsolutePath() + ") "
+ + "is: " + lastCheckpointWriteOrderID + ", which is beyond the "
+ + "requested checkpoint time: " + checkpointWriteOrderID
+ + " and position " + lastCheckpointPosition);
}
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index f51935c..d9a2a9b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -18,6 +18,17 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.flume.channel.file.encryption.CipherProvider;
+import org.apache.flume.channel.file.encryption.CipherProviderFactory;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.apache.flume.channel.file.proto.ProtosFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
@@ -28,19 +39,6 @@ import java.security.Key;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
-import javax.annotation.Nullable;
-
-import org.apache.flume.channel.file.proto.ProtosFactory;
-import org.apache.flume.channel.file.encryption.CipherProvider;
-import org.apache.flume.channel.file.encryption.CipherProviderFactory;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.GeneratedMessage;
-
/**
* Represents a single data file on disk. Has methods to write,
* read sequentially (replay), and read randomly (channel takes).
@@ -81,9 +79,15 @@ class LogFileV3 extends LogFile {
ProtosFactory.LogFileMetaData.newBuilder(logFileMetaData);
metaDataBuilder.setCheckpointPosition(currentPosition);
metaDataBuilder.setCheckpointWriteOrderID(logWriteOrderID);
+ /*
+ * Set the previous checkpoint position and write order id so that it
+ * would be possible to recover from a backup.
+ */
+ metaDataBuilder.setBackupCheckpointPosition(logFileMetaData
+ .getCheckpointPosition());
+ metaDataBuilder.setBackupCheckpointWriteOrderID(logFileMetaData
+ .getCheckpointWriteOrderID());
logFileMetaData = metaDataBuilder.build();
- LOGGER.info("Updating " + metaDataFile.getName() + " currentPosition = "
- + currentPosition + ", logWriteOrderID = " + logWriteOrderID);
writeDelimitedTo(logFileMetaData, metaDataFile);
}
}
@@ -101,7 +105,7 @@ class LogFileV3 extends LogFile {
FileInputStream inputStream = new FileInputStream(metaDataFile);
try {
ProtosFactory.LogFileMetaData metaData = Preconditions.checkNotNull(
- ProtosFactory.LogFileMetaData.
+ ProtosFactory.LogFileMetaData.
parseDelimitedFrom(inputStream), "Metadata cannot be null");
if (metaData.getLogFileID() != logFileID) {
throw new IOException("The file id of log file: "
@@ -193,6 +197,8 @@ class LogFileV3 extends LogFile {
metaDataBuilder.setLogFileID(logFileID);
metaDataBuilder.setCheckpointPosition(0L);
metaDataBuilder.setCheckpointWriteOrderID(0L);
+ metaDataBuilder.setBackupCheckpointPosition(0L);
+ metaDataBuilder.setBackupCheckpointWriteOrderID(0L);
File metaDataFile = Serialization.getMetaDataFile(file);
writeDelimitedTo(metaDataBuilder.build(), metaDataFile);
}
@@ -322,6 +328,9 @@ class LogFileV3 extends LogFile {
setLogFileID(metaData.getLogFileID());
setLastCheckpointPosition(metaData.getCheckpointPosition());
setLastCheckpointWriteOrderID(metaData.getCheckpointWriteOrderID());
+ setPreviousCheckpointPosition(metaData.getBackupCheckpointPosition());
+ setPreviousCheckpointWriteOrderID(
+ metaData.getBackupCheckpointWriteOrderID());
} finally {
try {
inputStream.close();
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
index fa4fd9d..fc47b23 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ReplayHandler.java
@@ -18,6 +18,19 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import org.apache.commons.collections.MultiMap;
+import org.apache.commons.collections.map.MultiValueMap;
+import org.apache.flume.channel.file.encryption.KeyProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
@@ -28,20 +41,6 @@ import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
-import javax.annotation.Nullable;
-
-import org.apache.commons.collections.MultiMap;
-import org.apache.commons.collections.map.MultiValueMap;
-import org.apache.flume.channel.file.encryption.KeyProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-
/**
* Processes a set of data logs, replaying said logs into the queue.
*/
@@ -69,6 +68,33 @@ class ReplayHandler {
* finding the put and commit in logdir2.
*/
private final List<Long> pendingTakes;
+ int readCount = 0;
+ int putCount = 0;
+ int takeCount = 0;
+ int rollbackCount = 0;
+ int commitCount = 0;
+ int skipCount = 0;
+
+ @VisibleForTesting
+ public int getReadCount() {
+ return readCount;
+ }
+ @VisibleForTesting
+ public int getPutCount() {
+ return putCount;
+ }
+ @VisibleForTesting
+ public int getTakeCount() {
+ return takeCount;
+ }
+ @VisibleForTesting
+ public int getCommitCount() {
+ return commitCount;
+ }
+ @VisibleForTesting
+ public int getRollbackCount() {
+ return rollbackCount;
+ }
ReplayHandler(FlumeEventQueue queue,
@Nullable KeyProvider encryptionKeyProvider) {
@@ -110,12 +136,7 @@ class ReplayHandler {
// for puts the fileId is the fileID of the file they exist in
// for takes the fileId and offset are pointers to a put
int fileId = reader.getLogFileID();
- int readCount = 0;
- int putCount = 0;
- int takeCount = 0;
- int rollbackCount = 0;
- int commitCount = 0;
- int skipCount = 0;
+
while ((entry = reader.next()) != null) {
int offset = entry.getOffset();
TransactionEventRecord record = entry.getEvent();
@@ -160,7 +181,7 @@ class ReplayHandler {
}
} else {
Preconditions.checkArgument(false, "Unknown record type: "
- + Integer.toHexString(type));
+ + Integer.toHexString(type));
}
} else {
@@ -255,12 +276,6 @@ class ReplayHandler {
}
LogRecord entry = null;
FlumeEventPointer ptr = null;
- int readCount = 0;
- int putCount = 0;
- int takeCount = 0;
- int rollbackCount = 0;
- int commitCount = 0;
- int skipCount = 0;
while ((entry = next()) != null) {
// for puts the fileId is the fileID of the file they exist in
// for takes the fileId and offset are pointers to a put
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index 7094d3c..d6897e1 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -18,11 +18,20 @@
*/
package org.apache.flume.channel.file;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.io.BufferedInputStream;
import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Collections;
+import java.util.Set;
class Serialization {
private Serialization() {}
@@ -38,6 +47,9 @@ class Serialization {
static final String METADATA_TMP_FILENAME = ".tmp";
static final String OLD_METADATA_FILENAME = METADATA_FILENAME + ".old";
+ // 64 K buffer to copy files.
+ private static final int FILE_COPY_BUFFER_SIZE = 64 * 1024;
+
public static final Logger LOG = LoggerFactory.getLogger(Serialization.class);
static File getMetaDataTempFile(File metaDataFile) {
@@ -60,20 +72,39 @@ class Serialization {
/**
* Deletes all files in given directory.
* @param checkpointDir - The directory whose files are to be deleted
+ * @param excludes - Names of files which should not be deleted from this
+ * directory.
* @return - true if all files were successfully deleted, false otherwise.
*/
- static boolean deleteAllFiles(File checkpointDir) {
+ static boolean deleteAllFiles(File checkpointDir,
+ @Nullable Set<String> excludes) {
if (!checkpointDir.isDirectory()) {
return false;
}
- StringBuilder builder = new StringBuilder("Deleted the following files from"
- + " the checkpoint directory: ");
+
File[] files = checkpointDir.listFiles();
+ if(files == null) {
+ return false;
+ }
+ StringBuilder builder;
+ if (files.length == 0) {
+ return true;
+ } else {
+ builder = new StringBuilder("Deleted the following files: ");
+ }
+ if(excludes == null) {
+ excludes = Collections.EMPTY_SET;
+ }
for (File file : files) {
+ if(excludes.contains(file.getName())) {
+ LOG.info("Skipping " + file.getName() + " because it is in excludes " +
+ "set");
+ continue;
+ }
if (!FileUtils.deleteQuietly(file)) {
LOG.info(builder.toString());
LOG.error("Error while attempting to delete: " +
- file.getName());
+ file.getAbsolutePath());
return false;
}
builder.append(", ").append(file.getName());
@@ -82,4 +113,70 @@ class Serialization {
LOG.info(builder.toString());
return true;
}
+
+ /**
+ * Copy a file using a 64K size buffer. This method will copy the file and
+ * then fsync to disk
+ * @param from File to copy - this file should exist
+ * @param to Destination file - this file should not exist
+ * @return true if the copy was successful
+ */
+ static boolean copyFile(File from, File to) throws IOException {
+ Preconditions.checkNotNull(from, "Source file is null, file copy failed.");
+ Preconditions.checkNotNull(to, "Destination file is null, " +
+ "file copy failed.");
+ Preconditions.checkState(from.exists(), "Source file: " + from.toString() +
+ " does not exist.");
+ Preconditions.checkState(!to.exists(), "Destination file: "
+ + to.toString() + " unexpectedly exists.");
+
+ BufferedInputStream in = null;
+ RandomAccessFile out = null; //use a RandomAccessFile for easy fsync
+ try {
+ in = new BufferedInputStream(new FileInputStream(from));
+ out = new RandomAccessFile(to, "rw");
+ byte[] buf = new byte[FILE_COPY_BUFFER_SIZE];
+ int total = 0;
+ while(true) {
+ int read = in.read(buf);
+ if (read == -1) {
+ break;
+ }
+ out.write(buf, 0, read);
+ total += read;
+ }
+ out.getFD().sync();
+ Preconditions.checkState(total == from.length(),
+ "The size of the origin file and destination file are not equal.");
+ return true;
+ } catch (Exception ex) {
+ LOG.error("Error while attempting to copy " + from.toString() + " to "
+ + to.toString() + ".", ex);
+ Throwables.propagate(ex);
+ } finally {
+ Throwable th = null;
+ try {
+ if (in != null) {
+ in.close();
+ }
+ } catch (Throwable ex) {
+ LOG.error("Error while closing input file.", ex);
+ th = ex;
+ }
+ try {
+ if (out != null) {
+ out.close();
+ }
+ } catch (IOException ex) {
+ LOG.error("Error while closing output file.", ex);
+ Throwables.propagate(ex);
+ }
+ if (th != null) {
+ Throwables.propagate(th);
+ }
+ }
+ // Should never reach here.
+ throw new IOException("Copying file: " + from.toString() + " to: " + to
+ .toString() + " may have failed.");
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
index e6d4957..4860ac2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/proto/ProtosFactory.java
@@ -1286,6 +1286,14 @@ public final class ProtosFactory {
boolean hasEncryption();
org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryption getEncryption();
org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryptionOrBuilder getEncryptionOrBuilder();
+
+ // optional sfixed64 backupCheckpointPosition = 6;
+ boolean hasBackupCheckpointPosition();
+ long getBackupCheckpointPosition();
+
+ // optional sfixed64 backupCheckpointWriteOrderID = 7;
+ boolean hasBackupCheckpointWriteOrderID();
+ long getBackupCheckpointWriteOrderID();
}
public static final class LogFileMetaData extends
com.google.protobuf.GeneratedMessage
@@ -1369,12 +1377,34 @@ public final class ProtosFactory {
return encryption_;
}
+ // optional sfixed64 backupCheckpointPosition = 6;
+ public static final int BACKUPCHECKPOINTPOSITION_FIELD_NUMBER = 6;
+ private long backupCheckpointPosition_;
+ public boolean hasBackupCheckpointPosition() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getBackupCheckpointPosition() {
+ return backupCheckpointPosition_;
+ }
+
+ // optional sfixed64 backupCheckpointWriteOrderID = 7;
+ public static final int BACKUPCHECKPOINTWRITEORDERID_FIELD_NUMBER = 7;
+ private long backupCheckpointWriteOrderID_;
+ public boolean hasBackupCheckpointWriteOrderID() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public long getBackupCheckpointWriteOrderID() {
+ return backupCheckpointWriteOrderID_;
+ }
+
private void initFields() {
version_ = 0;
logFileID_ = 0;
checkpointPosition_ = 0L;
checkpointWriteOrderID_ = 0L;
encryption_ = org.apache.flume.channel.file.proto.ProtosFactory.LogFileEncryption.getDefaultInstance();
+ backupCheckpointPosition_ = 0L;
+ backupCheckpointWriteOrderID_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -1425,6 +1455,12 @@ public final class ProtosFactory {
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(5, encryption_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeSFixed64(6, backupCheckpointPosition_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ output.writeSFixed64(7, backupCheckpointWriteOrderID_);
+ }
getUnknownFields().writeTo(output);
}
@@ -1454,6 +1490,14 @@ public final class ProtosFactory {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, encryption_);
}
+ if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeSFixed64Size(6, backupCheckpointPosition_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeSFixed64Size(7, backupCheckpointWriteOrderID_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -1593,6 +1637,10 @@ public final class ProtosFactory {
encryptionBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000010);
+ backupCheckpointPosition_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000020);
+ backupCheckpointWriteOrderID_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -1655,6 +1703,14 @@ public final class ProtosFactory {
} else {
result.encryption_ = encryptionBuilder_.build();
}
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000020;
+ }
+ result.backupCheckpointPosition_ = backupCheckpointPosition_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
+ result.backupCheckpointWriteOrderID_ = backupCheckpointWriteOrderID_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -1686,6 +1742,12 @@ public final class ProtosFactory {
if (other.hasEncryption()) {
mergeEncryption(other.getEncryption());
}
+ if (other.hasBackupCheckpointPosition()) {
+ setBackupCheckpointPosition(other.getBackupCheckpointPosition());
+ }
+ if (other.hasBackupCheckpointWriteOrderID()) {
+ setBackupCheckpointWriteOrderID(other.getBackupCheckpointWriteOrderID());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -1768,6 +1830,16 @@ public final class ProtosFactory {
setEncryption(subBuilder.buildPartial());
break;
}
+ case 49: {
+ bitField0_ |= 0x00000020;
+ backupCheckpointPosition_ = input.readSFixed64();
+ break;
+ }
+ case 57: {
+ bitField0_ |= 0x00000040;
+ backupCheckpointWriteOrderID_ = input.readSFixed64();
+ break;
+ }
}
}
}
@@ -1948,6 +2020,48 @@ public final class ProtosFactory {
return encryptionBuilder_;
}
+ // optional sfixed64 backupCheckpointPosition = 6;
+ private long backupCheckpointPosition_ ;
+ public boolean hasBackupCheckpointPosition() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public long getBackupCheckpointPosition() {
+ return backupCheckpointPosition_;
+ }
+ public Builder setBackupCheckpointPosition(long value) {
+ bitField0_ |= 0x00000020;
+ backupCheckpointPosition_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearBackupCheckpointPosition() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ backupCheckpointPosition_ = 0L;
+ onChanged();
+ return this;
+ }
+
+ // optional sfixed64 backupCheckpointWriteOrderID = 7;
+ private long backupCheckpointWriteOrderID_ ;
+ public boolean hasBackupCheckpointWriteOrderID() {
+ return ((bitField0_ & 0x00000040) == 0x00000040);
+ }
+ public long getBackupCheckpointWriteOrderID() {
+ return backupCheckpointWriteOrderID_;
+ }
+ public Builder setBackupCheckpointWriteOrderID(long value) {
+ bitField0_ |= 0x00000040;
+ backupCheckpointWriteOrderID_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearBackupCheckpointWriteOrderID() {
+ bitField0_ = (bitField0_ & ~0x00000040);
+ backupCheckpointWriteOrderID_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:LogFileMetaData)
}
@@ -5921,23 +6035,25 @@ public final class ProtosFactory {
"sion\030\001 \002(\017\022\024\n\014writeOrderID\030\002 \002(\020\022\021\n\tqueu" +
"eSize\030\003 \002(\017\022\021\n\tqueueHead\030\004 \002(\017\022\036\n\nactive" +
"Logs\030\005 \003(\0132\n.ActiveLog\"-\n\tActiveLog\022\021\n\tl" +
- "ogFileID\030\001 \002(\017\022\r\n\005count\030\002 \002(\017\"\231\001\n\017LogFil" +
+ "ogFileID\030\001 \002(\017\022\r\n\005count\030\002 \002(\017\"\341\001\n\017LogFil" +
"eMetaData\022\017\n\007version\030\001 \002(\017\022\021\n\tlogFileID\030" +
"\002 \002(\017\022\032\n\022checkpointPosition\030\003 \002(\020\022\036\n\026che" +
"ckpointWriteOrderID\030\004 \002(\020\022&\n\nencryption\030" +
- "\005 \001(\0132\022.LogFileEncryption\"Q\n\021LogFileEncr" +
- "yption\022\026\n\016cipherProvider\030\001 \002(\t\022\020\n\010keyAli",
- "as\030\002 \002(\t\022\022\n\nparameters\030\003 \001(\014\"S\n\026Transact" +
- "ionEventHeader\022\014\n\004type\030\001 \002(\017\022\025\n\rtransact" +
- "ionID\030\002 \002(\020\022\024\n\014writeOrderID\030\003 \002(\020\"!\n\003Put" +
- "\022\032\n\005event\030\001 \002(\0132\013.FlumeEvent\"&\n\004Take\022\016\n\006" +
- "fileID\030\001 \002(\017\022\016\n\006offset\030\002 \002(\017\"\n\n\010Rollback" +
- "\"\026\n\006Commit\022\014\n\004type\030\001 \002(\017\"\030\n\026TransactionE" +
- "ventFooter\">\n\nFlumeEvent\022\"\n\007headers\030\001 \003(" +
- "\0132\021.FlumeEventHeader\022\014\n\004body\030\002 \002(\014\".\n\020Fl" +
- "umeEventHeader\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002" +
- "(\tB4\n#org.apache.flume.channel.file.prot",
- "oB\rProtosFactory"
+ "\005 \001(\0132\022.LogFileEncryption\022 \n\030backupCheck" +
+ "pointPosition\030\006 \001(\020\022$\n\034backupCheckpointW",
+ "riteOrderID\030\007 \001(\020\"Q\n\021LogFileEncryption\022\026" +
+ "\n\016cipherProvider\030\001 \002(\t\022\020\n\010keyAlias\030\002 \002(\t" +
+ "\022\022\n\nparameters\030\003 \001(\014\"S\n\026TransactionEvent" +
+ "Header\022\014\n\004type\030\001 \002(\017\022\025\n\rtransactionID\030\002 " +
+ "\002(\020\022\024\n\014writeOrderID\030\003 \002(\020\"!\n\003Put\022\032\n\005even" +
+ "t\030\001 \002(\0132\013.FlumeEvent\"&\n\004Take\022\016\n\006fileID\030\001" +
+ " \002(\017\022\016\n\006offset\030\002 \002(\017\"\n\n\010Rollback\"\026\n\006Comm" +
+ "it\022\014\n\004type\030\001 \002(\017\"\030\n\026TransactionEventFoot" +
+ "er\">\n\nFlumeEvent\022\"\n\007headers\030\001 \003(\0132\021.Flum" +
+ "eEventHeader\022\014\n\004body\030\002 \002(\014\".\n\020FlumeEvent",
+ "Header\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\tB4\n#or" +
+ "g.apache.flume.channel.file.protoB\rProto" +
+ "sFactory"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -5965,7 +6081,7 @@ public final class ProtosFactory {
internal_static_LogFileMetaData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_LogFileMetaData_descriptor,
- new java.lang.String[] { "Version", "LogFileID", "CheckpointPosition", "CheckpointWriteOrderID", "Encryption", },
+ new java.lang.String[] { "Version", "LogFileID", "CheckpointPosition", "CheckpointWriteOrderID", "Encryption", "BackupCheckpointPosition", "BackupCheckpointWriteOrderID", },
org.apache.flume.channel.file.proto.ProtosFactory.LogFileMetaData.class,
org.apache.flume.channel.file.proto.ProtosFactory.LogFileMetaData.Builder.class);
internal_static_LogFileEncryption_descriptor =
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
index 3a4e828..1e668d2 100644
--- a/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
+++ b/flume-ng-channels/flume-file-channel/src/main/proto/filechannel.proto
@@ -38,6 +38,8 @@ message LogFileMetaData {
required sfixed64 checkpointPosition = 3;
required sfixed64 checkpointWriteOrderID = 4;
optional LogFileEncryption encryption = 5;
+ optional sfixed64 backupCheckpointPosition = 6;
+ optional sfixed64 backupCheckpointWriteOrderID = 7;
}
message LogFileEncryption {
http://git-wip-us.apache.org/repos/asf/flume/blob/6ca61680/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
index 3da09ab..1ee5320 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelBase.java
@@ -37,12 +37,15 @@ public class TestFileChannelBase {
protected File checkpointDir;
protected File[] dataDirs;
protected String dataDir;
+ protected File backupDir;
@Before
public void setup() throws Exception {
baseDir = Files.createTempDir();
checkpointDir = new File(baseDir, "chkpt");
+ backupDir = new File(baseDir, "backup");
Assert.assertTrue(checkpointDir.mkdirs() || checkpointDir.isDirectory());
+ Assert.assertTrue(backupDir.mkdirs() || backupDir.isDirectory());
dataDirs = new File[3];
dataDir = "";
for (int i = 0; i < dataDirs.length; i++) {
@@ -68,7 +71,7 @@ public class TestFileChannelBase {
protected Context createContext(Map<String, String> overrides) {
return TestUtils.createFileChannelContext(checkpointDir.getAbsolutePath(),
- dataDir, overrides);
+ dataDir, backupDir.getAbsolutePath(), overrides);
}
protected FileChannel createFileChannel() {
@@ -77,6 +80,6 @@ public class TestFileChannelBase {
protected FileChannel createFileChannel(Map<String, String> overrides) {
return TestUtils.createFileChannel(checkpointDir.getAbsolutePath(),
- dataDir, overrides);
+ dataDir, backupDir.getAbsolutePath(), overrides);
}
}