You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/08/23 16:27:58 UTC
flume git commit: FLUME-3152 Add Flume Metric for Backup Checkpoint
Errors
Repository: flume
Updated Branches:
refs/heads/trunk 66327aa20 -> 4d79aa003
FLUME-3152 Add Flume Metric for Backup Checkpoint Errors
This change adds a new metric (channel.file.checkpoint.backup.write.error)
to the File Channel. It gets incremented if an exception happens
during backup checkpoints writes.
This closes #156
Reviewers: Denes Arvay
(Ferenc Szabo via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d79aa00
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d79aa00
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d79aa00
Branch: refs/heads/trunk
Commit: 4d79aa003aa02e8d513a1ae1406795d758143397
Parents: 66327aa
Author: Ferenc Szabo <fs...@cloudera.com>
Authored: Mon Aug 21 14:29:38 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Wed Aug 23 18:18:39 2017 +0200
----------------------------------------------------------------------
.../flume/channel/file/CheckpointRebuilder.java | 3 +-
.../file/EventQueueBackingStoreFactory.java | 41 +++++----
.../file/EventQueueBackingStoreFile.java | 24 ++++--
.../file/EventQueueBackingStoreFileV2.java | 8 +-
.../file/EventQueueBackingStoreFileV3.java | 12 +--
.../java/org/apache/flume/channel/file/Log.java | 4 +-
.../instrumentation/FileChannelCounter.java | 16 +++-
.../FileChannelCounterMBean.java | 7 ++
.../flume/channel/file/TestCheckpoint.java | 3 +-
.../channel/file/TestCheckpointRebuilder.java | 3 +-
.../file/TestEventQueueBackingStoreFactory.java | 87 +++++++++++++-------
.../file/TestFileChannelErrorMetrics.java | 67 +++++++++++++++
.../flume/channel/file/TestFlumeEventQueue.java | 19 +++--
13 files changed, 220 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
index a0ecdeb..8fbf3c8 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/CheckpointRebuilder.java
@@ -28,6 +28,7 @@ import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,7 +241,7 @@ public class CheckpointRebuilder {
} else {
EventQueueBackingStore backingStore =
EventQueueBackingStoreFactory.get(checkpointFile,
- capacity, "channel");
+ capacity, "channel", new FileChannelCounter("Main"));
FlumeEventQueue queue = new FlumeEventQueue(backingStore,
new File(checkpointDir, "inflighttakes"),
new File(checkpointDir, "inflightputs"),
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
index dcd6f98..7f8b3f6 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFactory.java
@@ -19,6 +19,7 @@
package org.apache.flume.channel.file;
import com.google.common.io.Files;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,19 +32,22 @@ class EventQueueBackingStoreFactory {
private EventQueueBackingStoreFactory() {}
- static EventQueueBackingStore get(File checkpointFile, int capacity,
- String name) throws Exception {
- return get(checkpointFile, capacity, name, true);
+ static EventQueueBackingStore get(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter
+ ) throws Exception {
+ return get(checkpointFile, capacity, name, counter, true);
}
- static EventQueueBackingStore get(File checkpointFile, int capacity,
- String name, boolean upgrade) throws Exception {
- return get(checkpointFile, null, capacity, name, upgrade, false, false);
+ static EventQueueBackingStore get(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter, boolean upgrade
+ ) throws Exception {
+ return get(checkpointFile, null, capacity, name, counter, upgrade, false, false);
}
- static EventQueueBackingStore get(File checkpointFile, File backupCheckpointDir,
- int capacity, String name, boolean upgrade,
- boolean shouldBackup, boolean compressBackup) throws Exception {
+ static EventQueueBackingStore get(
+ File checkpointFile, File backupCheckpointDir, int capacity, String name,
+ FileChannelCounter counter, boolean upgrade, boolean shouldBackup, boolean compressBackup
+ ) throws Exception {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
RandomAccessFile checkpointFileHandle = null;
try {
@@ -69,21 +73,21 @@ class EventQueueBackingStoreFactory {
throw new IOException("Cannot create " + checkpointFile);
}
return new EventQueueBackingStoreFileV3(checkpointFile,
- capacity, name, backupCheckpointDir, shouldBackup, compressBackup);
+ capacity, name, counter, backupCheckpointDir, shouldBackup, compressBackup);
}
// v3 due to meta file, version will be checked by backing store
if (metaDataExists) {
return new EventQueueBackingStoreFileV3(checkpointFile, capacity,
- name, backupCheckpointDir, shouldBackup, compressBackup);
+ name, counter, backupCheckpointDir, shouldBackup, compressBackup);
}
checkpointFileHandle = new RandomAccessFile(checkpointFile, "r");
int version = (int) checkpointFileHandle.readLong();
if (Serialization.VERSION_2 == version) {
if (upgrade) {
return upgrade(checkpointFile, capacity, name, backupCheckpointDir,
- shouldBackup, compressBackup);
+ shouldBackup, compressBackup, counter);
}
- return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+ return new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter);
}
LOG.error("Found version " + Integer.toHexString(version) + " in " +
checkpointFile);
@@ -100,12 +104,13 @@ class EventQueueBackingStoreFactory {
}
}
- private static EventQueueBackingStore upgrade(File checkpointFile, int capacity, String name,
- File backupCheckpointDir, boolean shouldBackup,
- boolean compressBackup) throws Exception {
+ private static EventQueueBackingStore upgrade(
+ File checkpointFile, int capacity, String name, File backupCheckpointDir,
+ boolean shouldBackup, boolean compressBackup, FileChannelCounter counter
+ ) throws Exception {
LOG.info("Attempting upgrade of " + checkpointFile + " for " + name);
EventQueueBackingStoreFileV2 backingStoreV2 =
- new EventQueueBackingStoreFileV2(checkpointFile, capacity, name);
+ new EventQueueBackingStoreFileV2(checkpointFile, capacity, name, counter);
String backupName = checkpointFile.getName() + "-backup-"
+ System.currentTimeMillis();
Files.copy(checkpointFile,
@@ -113,7 +118,7 @@ class EventQueueBackingStoreFactory {
File metaDataFile = Serialization.getMetaDataFile(checkpointFile);
EventQueueBackingStoreFileV3.upgrade(backingStoreV2, checkpointFile,
metaDataFile);
- return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name,
+ return new EventQueueBackingStoreFileV3(checkpointFile, capacity, name, counter,
backupCheckpointDir, shouldBackup, compressBackup);
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 73f1d4c..445d912 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Maps;
import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +61,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
protected final Map<Integer, AtomicInteger> logFileIDReferenceCounts = Maps.newHashMap();
protected final MappedByteBuffer mappedBuffer;
protected final RandomAccessFile checkpointFileHandle;
+ private final FileChannelCounter fileChannelCounter;
protected final File checkpointFile;
private final Semaphore backupCompletedSema = new Semaphore(1);
protected final boolean shouldBackup;
@@ -67,17 +69,18 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
private final File backupDir;
private final ExecutorService checkpointBackUpExecutor;
- protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile) throws IOException,
- BadCheckpointException {
- this(capacity, name, checkpointFile, null, false, false);
+ protected EventQueueBackingStoreFile(
+ int capacity, String name, FileChannelCounter fileChannelCounter, File checkpointFile
+ ) throws IOException, BadCheckpointException {
+ this(capacity, name, fileChannelCounter, checkpointFile, null, false, false);
}
- protected EventQueueBackingStoreFile(int capacity, String name,
- File checkpointFile, File checkpointBackupDir,
- boolean backupCheckpoint, boolean compressBackup)
- throws IOException, BadCheckpointException {
+ protected EventQueueBackingStoreFile(
+ int capacity, String name, FileChannelCounter fileChannelCounter, File checkpointFile,
+ File checkpointBackupDir, boolean backupCheckpoint, boolean compressBackup
+ ) throws IOException, BadCheckpointException {
super(capacity, name);
+ this.fileChannelCounter = fileChannelCounter;
this.checkpointFile = checkpointFile;
this.shouldBackup = backupCheckpoint;
this.compressBackup = compressBackup;
@@ -294,6 +297,7 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
try {
backupCheckpoint(backupDir);
} catch (Throwable throwable) {
+ fileChannelCounter.incrementCheckpointBackupWriteErrorCount();
error = true;
LOG.error("Backing up of checkpoint directory failed.", throwable);
} finally {
@@ -432,7 +436,9 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
}
int capacity = (int) ((file.length() - (HEADER_SIZE * 8L)) / 8L);
EventQueueBackingStoreFile backingStore = (EventQueueBackingStoreFile)
- EventQueueBackingStoreFactory.get(file, capacity, "debug", false);
+ EventQueueBackingStoreFactory.get(
+ file, capacity, "debug", new FileChannelCounter("Main"), false
+ );
System.out.println("File Reference Counts"
+ backingStore.logFileIDReferenceCounts);
System.out.println("Queue Capacity " + backingStore.getCapacity());
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
index 71183aa..3711a78 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV2.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Preconditions;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
@@ -33,9 +34,10 @@ final class EventQueueBackingStoreFileV2 extends EventQueueBackingStoreFile {
private static final int INDEX_ACTIVE_LOG = 5;
private static final int MAX_ACTIVE_LOGS = 1024;
- EventQueueBackingStoreFileV2(File checkpointFile, int capacity, String name)
- throws IOException, BadCheckpointException {
- super(capacity, name, checkpointFile);
+ EventQueueBackingStoreFileV2(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter
+ ) throws IOException, BadCheckpointException {
+ super(capacity, name, counter, checkpointFile);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
index f1a892a..da5a082 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFileV3.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel.file;
import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.apache.flume.channel.file.proto.ProtosFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,16 +37,17 @@ final class EventQueueBackingStoreFileV3 extends EventQueueBackingStoreFile {
private static final Logger LOG = LoggerFactory.getLogger(EventQueueBackingStoreFileV3.class);
private final File metaDataFile;
- EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
- String name) throws IOException, BadCheckpointException {
- this(checkpointFile, capacity, name, null, false, false);
+ EventQueueBackingStoreFileV3(
+ File checkpointFile, int capacity, String name, FileChannelCounter counter
+ ) throws IOException, BadCheckpointException {
+ this(checkpointFile, capacity, name, counter, null, false, false);
}
EventQueueBackingStoreFileV3(File checkpointFile, int capacity,
- String name, File checkpointBackupDir,
+ String name, FileChannelCounter counter, File checkpointBackupDir,
boolean backupCheckpoint, boolean compressBackup)
throws IOException, BadCheckpointException {
- super(capacity, name, checkpointFile, checkpointBackupDir, backupCheckpoint,
+ super(capacity, name, counter, checkpointFile, checkpointBackupDir, backupCheckpoint,
compressBackup);
Preconditions.checkArgument(capacity > 0,
"capacity must be greater than 0 " + capacity);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
index 1662a5b..efc8d14 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Log.java
@@ -450,7 +450,7 @@ public class Log {
backingStore =
EventQueueBackingStoreFactory.get(checkpointFile,
backupCheckpointDir, queueCapacity, channelNameDescriptor,
- true, this.useDualCheckpoints,
+ channelCounter, true, this.useDualCheckpoints,
this.compressBackupCheckpoint);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile, queueSetDir);
@@ -487,7 +487,7 @@ public class Log {
}
backingStore = EventQueueBackingStoreFactory.get(
checkpointFile, backupCheckpointDir, queueCapacity,
- channelNameDescriptor, true, useDualCheckpoints,
+ channelNameDescriptor, channelCounter, true, useDualCheckpoints,
compressBackupCheckpoint);
queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile, queueSetDir);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
index 40470a8..6cec3da 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounter.java
@@ -28,10 +28,15 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou
private static final String EVENT_PUT_ERROR_COUNT = "channel.file.event.put.error";
private static final String EVENT_TAKE_ERROR_COUNT = "channel.file.event.take.error";
private static final String CHECKPOINT_WRITE_ERROR_COUNT = "channel.file.checkpoint.write.error";
+ private static final String CHECKPOINT_BACKUP_WRITE_ERROR_COUNT
+ = "channel.file.checkpoint.backup.write.error";
public FileChannelCounter(String name) {
super(name, new String[] {
- EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT, CHECKPOINT_WRITE_ERROR_COUNT });
+ EVENT_PUT_ERROR_COUNT, EVENT_TAKE_ERROR_COUNT,
+ CHECKPOINT_WRITE_ERROR_COUNT, CHECKPOINT_BACKUP_WRITE_ERROR_COUNT
+ }
+ );
}
@Override
@@ -83,4 +88,13 @@ public class FileChannelCounter extends ChannelCounter implements FileChannelCou
public void incrementCheckpointWriteErrorCount() {
increment(CHECKPOINT_WRITE_ERROR_COUNT);
}
+
+ @Override
+ public long getCheckpointBackupWriteErrorCount() {
+ return get(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT);
+ }
+
+ public void incrementCheckpointBackupWriteErrorCount() {
+ increment(CHECKPOINT_BACKUP_WRITE_ERROR_COUNT);
+ }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
index 175b1f4..9386094 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/instrumentation/FileChannelCounterMBean.java
@@ -62,4 +62,11 @@ public interface FileChannelCounterMBean extends ChannelCounterMBean {
* @see org.apache.flume.channel.file.Log.BackgroundWorker#run()
*/
long getCheckpointWriteErrorCount();
+
+ /**
+ * A count of the number of errors encountered while trying to write the backup checkpoints. This
+ * includes any Throwables.
+ * @see org.apache.flume.channel.file.EventQueueBackingStoreFile#startBackupThread()
+ */
+ long getCheckpointBackupWriteErrorCount();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
index cd1dcd9..1e00ee2 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpoint.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import junit.framework.Assert;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -51,7 +52,7 @@ public class TestCheckpoint {
@Test
public void testSerialization() throws Exception {
EventQueueBackingStore backingStore =
- new EventQueueBackingStoreFileV2(file, 1, "test");
+ new EventQueueBackingStoreFileV2(file, 1, "test", new FileChannelCounter("test"));
FlumeEventPointer ptrIn = new FlumeEventPointer(10, 20);
FlumeEventQueue queueIn = new FlumeEventQueue(backingStore,
inflightTakes, inflightPuts, queueSet);
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
index c6c6ad3..6c91661 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestCheckpointRebuilder.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.util.Map;
import java.util.Set;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -70,7 +71,7 @@ public class TestCheckpointRebuilder extends TestFileChannelBase {
Assert.assertTrue(inflightPutsFile.delete());
EventQueueBackingStore backingStore =
EventQueueBackingStoreFactory.get(checkpointFile, 50,
- "test");
+ "test", new FileChannelCounter("test"));
FlumeEventQueue queue = new FlumeEventQueue(backingStore, inflightTakesFile,
inflightPutsFile, queueSetDir);
CheckpointRebuilder checkpointRebuilder =
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
index 0939454..7aebb03 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestEventQueueBackingStoreFactory.java
@@ -23,6 +23,7 @@ import com.google.common.io.Files;
import com.google.protobuf.InvalidProtocolBufferException;
import junit.framework.Assert;
import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.apache.flume.channel.file.proto.ProtosFactory;
import org.junit.After;
import org.junit.Before;
@@ -75,29 +76,39 @@ public class TestEventQueueBackingStoreFactory {
@Test
public void testWithNoFlag() throws Exception {
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test"),
- Serialization.VERSION_3, pointersInTestCheckpoint);
+ verify(
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test")),
+ Serialization.VERSION_3, pointersInTestCheckpoint
+ );
}
@Test
public void testWithFlag() throws Exception {
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", true),
- Serialization.VERSION_3, pointersInTestCheckpoint);
+ verify(
+ EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test"), true
+ ),
+ Serialization.VERSION_3, pointersInTestCheckpoint
+ );
}
@Test
public void testNoUprade() throws Exception {
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
- Serialization.VERSION_2, pointersInTestCheckpoint);
+ verify(
+ EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test"), false
+ ),
+ Serialization.VERSION_2, pointersInTestCheckpoint
+ );
}
@Test(expected = BadCheckpointException.class)
public void testDecreaseCapacity() throws Exception {
Assert.assertTrue(checkpoint.delete());
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
- EventQueueBackingStoreFactory.get(checkpoint, 9, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 9, "test", new FileChannelCounter("test"));
Assert.fail();
}
@@ -105,17 +116,21 @@ public class TestEventQueueBackingStoreFactory {
public void testIncreaseCapacity() throws Exception {
Assert.assertTrue(checkpoint.delete());
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
- EventQueueBackingStoreFactory.get(checkpoint, 11, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 11, "test", new FileChannelCounter("test"));
Assert.fail();
}
@Test
public void testNewCheckpoint() throws Exception {
Assert.assertTrue(checkpoint.delete());
- verify(EventQueueBackingStoreFactory.get(checkpoint, 10, "test", false),
- Serialization.VERSION_3, Collections.<Long>emptyList());
+ verify(
+ EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test"), false
+ ),
+ Serialization.VERSION_3, Collections.<Long>emptyList()
+ );
}
@Test(expected = BadCheckpointException.class)
@@ -123,13 +138,15 @@ public class TestEventQueueBackingStoreFactory {
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
writer.writeLong(94L);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -141,12 +158,14 @@ public class TestEventQueueBackingStoreFactory {
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_CHECKPOINT_MARKER * Serialization.SIZE_OF_LONG);
writer.writeLong(EventQueueBackingStoreFile.CHECKPOINT_INCOMPLETE);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -157,12 +176,14 @@ public class TestEventQueueBackingStoreFactory {
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_VERSION * Serialization.SIZE_OF_LONG);
writer.writeLong(2L);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -173,7 +194,7 @@ public class TestEventQueueBackingStoreFactory {
FileOutputStream os = null;
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -184,7 +205,9 @@ public class TestEventQueueBackingStoreFactory {
os = new FileOutputStream(Serialization.getMetaDataFile(checkpoint));
meta.toBuilder().setVersion(2).build().writeDelimitedTo(os);
os.flush();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
os.close();
}
@@ -195,12 +218,14 @@ public class TestEventQueueBackingStoreFactory {
RandomAccessFile writer = new RandomAccessFile(checkpoint, "rw");
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
writer.seek(EventQueueBackingStoreFile.INDEX_WRITE_ORDER_ID * Serialization.SIZE_OF_LONG);
writer.writeLong(2L);
writer.getFD().sync();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
writer.close();
}
@@ -211,7 +236,7 @@ public class TestEventQueueBackingStoreFactory {
FileOutputStream os = null;
try {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
Assert.assertTrue(checkpoint.exists());
Assert.assertTrue(Serialization.getMetaDataFile(checkpoint).length() != 0);
@@ -223,7 +248,9 @@ public class TestEventQueueBackingStoreFactory {
Serialization.getMetaDataFile(checkpoint));
meta.toBuilder().setWriteOrderID(1).build().writeDelimitedTo(os);
os.flush();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} finally {
os.close();
}
@@ -232,7 +259,7 @@ public class TestEventQueueBackingStoreFactory {
@Test(expected = BadCheckpointException.class)
public void testTruncateMeta() throws Exception {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
Assert.assertTrue(checkpoint.exists());
File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -241,13 +268,15 @@ public class TestEventQueueBackingStoreFactory {
writer.setLength(0);
writer.getFD().sync();
writer.close();
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
}
@Test(expected = InvalidProtocolBufferException.class)
public void testCorruptMeta() throws Throwable {
EventQueueBackingStore backingStore =
- EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ EventQueueBackingStoreFactory.get(checkpoint, 10, "test", new FileChannelCounter("test"));
backingStore.close();
Assert.assertTrue(checkpoint.exists());
File metaFile = Serialization.getMetaDataFile(checkpoint);
@@ -258,7 +287,9 @@ public class TestEventQueueBackingStoreFactory {
writer.getFD().sync();
writer.close();
try {
- backingStore = EventQueueBackingStoreFactory.get(checkpoint, 10, "test");
+ backingStore = EventQueueBackingStoreFactory.get(
+ checkpoint, 10, "test", new FileChannelCounter("test")
+ );
} catch (BadCheckpointException ex) {
throw ex.getCause();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
index d0237db..e2d1ee6 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelErrorMetrics.java
@@ -31,7 +31,10 @@ import org.mockito.stubbing.Answer;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
@@ -230,6 +233,70 @@ public class TestFileChannelErrorMetrics extends TestFileChannelBase {
assertFalse(channel.getChannelCounter().isOpen());
}
+ @Test
+ public void testCheckpointBackupWriteErrorShouldIncreaseCounter()
+ throws IOException, InterruptedException {
+ FileChannelCounter fileChannelCounter = new FileChannelCounter("test");
+ File checkpointFile = File.createTempFile("checkpoint", ".tmp");
+ File backupDir = Files.createTempDirectory("checkpoint").toFile();
+ backupDir.deleteOnExit();
+ checkpointFile.deleteOnExit();
+ EventQueueBackingStoreFileV3 backingStoreFileV3 = new EventQueueBackingStoreFileV3(
+ checkpointFile, 1, "test", fileChannelCounter, backupDir,true, false
+ );
+
+ // Exception will be thrown by state check if beforeCheckpoint is not called
+ backingStoreFileV3.checkpoint();
+ // wait for other thread to reach the error state
+ assertEventuallyTrue("checkpoint backup write failure should increase counter to 1",
+ new BooleanPredicate() {
+ @Override
+ public boolean get() {
+ return fileChannelCounter.getCheckpointBackupWriteErrorCount() == 1;
+ }
+ },
+ 100
+ );
+ }
+
+ @Test
+ public void testCheckpointBackupWriteErrorShouldIncreaseCounter2()
+ throws Exception {
+ int checkpointInterval = 1500;
+ Map config = new HashMap();
+ config.put(FileChannelConfiguration.CHECKPOINT_INTERVAL, String.valueOf(checkpointInterval));
+ config.put(FileChannelConfiguration.USE_DUAL_CHECKPOINTS, "true");
+ final FileChannel channel = createFileChannel(Collections.unmodifiableMap(config));
+ channel.start();
+ Transaction tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test".getBytes()));
+ tx.commit();
+ tx.close();
+ final long beforeCheckpointWrite = System.currentTimeMillis();
+ // first checkpoint should be written successfully -> the counter should remain 0
+ assertEventuallyTrue("checkpoint backup should have been written", new BooleanPredicate() {
+ @Override
+ public boolean get() {
+ return new File(backupDir, "checkpoint").lastModified() > beforeCheckpointWrite;
+ }
+ }, checkpointInterval * 3);
+ assertEquals(0, channel.getChannelCounter().getCheckpointBackupWriteErrorCount());
+ FileUtils.deleteDirectory(backupDir);
+ tx = channel.getTransaction();
+ tx.begin();
+ channel.put(EventBuilder.withBody("test2".getBytes()));
+ tx.commit();
+ tx.close();
+ // the backup directory has been deleted so the backup checkpoint write should have been failed
+ assertEventuallyTrue("checkpointBackupWriteErrorCount should be 1", new BooleanPredicate() {
+ @Override
+ public boolean get() {
+ return channel.getChannelCounter().getCheckpointBackupWriteErrorCount() >= 1;
+ }
+ }, checkpointInterval * 3);
+ }
+
private interface BooleanPredicate {
boolean get();
}
http://git-wip-us.apache.org/repos/asf/flume/blob/4d79aa00/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
index f1700f9..9c7352e 100644
--- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFlumeEventQueue.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.flume.channel.file.instrumentation.FileChannelCounter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -96,7 +97,7 @@ public class TestFlumeEventQueue {
public EventQueueBackingStore get() throws Exception {
Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
return new EventQueueBackingStoreFileV2(getCheckpoint(), 1000,
- "test");
+ "test", new FileChannelCounter("test"));
}
}
},
@@ -105,7 +106,9 @@ public class TestFlumeEventQueue {
@Override
public EventQueueBackingStore get() throws Exception {
Assert.assertTrue(baseDir.isDirectory() || baseDir.mkdirs());
- return new EventQueueBackingStoreFileV3(getCheckpoint(), 1000, "test");
+ return new EventQueueBackingStoreFileV3(
+ getCheckpoint(), 1000, "test", new FileChannelCounter("test")
+ );
}
}
}
@@ -135,7 +138,9 @@ public class TestFlumeEventQueue {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
- backingStore = new EventQueueBackingStoreFileV2(checkpoint, 1, "test");
+ backingStore = new EventQueueBackingStoreFileV2(
+ checkpoint, 1, "test", new FileChannelCounter("test")
+ );
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
backingStoreSupplier.getInflightPuts(),
@@ -149,7 +154,9 @@ public class TestFlumeEventQueue {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
- backingStore = new EventQueueBackingStoreFileV2(checkpoint, 0, "test");
+ backingStore = new EventQueueBackingStoreFileV2(
+ checkpoint, 0, "test", new FileChannelCounter("test")
+ );
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
backingStoreSupplier.getInflightPuts(),
@@ -161,7 +168,9 @@ public class TestFlumeEventQueue {
backingStore.close();
File checkpoint = backingStoreSupplier.getCheckpoint();
Assert.assertTrue(checkpoint.delete());
- backingStore = new EventQueueBackingStoreFileV2(checkpoint, -1, "test");
+ backingStore = new EventQueueBackingStoreFileV2(
+ checkpoint, -1, "test", new FileChannelCounter("test")
+ );
queue = new FlumeEventQueue(backingStore,
backingStoreSupplier.getInflightTakes(),
backingStoreSupplier.getInflightPuts(),