You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/08 18:01:59 UTC
git commit: FLUME-1824: Inflights can complete successfully even if
checkpoint fails
Updated Branches:
refs/heads/trunk 21c67ed59 -> 1a2e0d7a7
FLUME-1824: Inflights can complete successfully even if checkpoint fails
(Hari Shreedharan via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1a2e0d7a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1a2e0d7a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1a2e0d7a
Branch: refs/heads/trunk
Commit: 1a2e0d7a7629eb32821c780d48a2c17f4e76a59e
Parents: 21c67ed
Author: Brock Noland <br...@apache.org>
Authored: Tue Jan 8 11:01:31 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Tue Jan 8 11:01:31 2013 -0600
----------------------------------------------------------------------
.../flume/channel/file/EventQueueBackingStore.java | 1 +
.../channel/file/EventQueueBackingStoreFile.java | 7 +++++--
.../apache/flume/channel/file/FlumeEventQueue.java | 1 +
3 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
index 13b50da..b136eb0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
@@ -36,6 +36,7 @@ abstract class EventQueueBackingStore {
}
+ abstract void beginCheckpoint() throws IOException;
abstract void checkpoint() throws IOException;
abstract void incrementFileID(int fileID);
abstract void decrementFileID(int fileID);
http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 186b15a..7f35301 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -104,14 +104,17 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
protected abstract void writeCheckpointMetaData() throws IOException;
@Override
- void checkpoint() throws IOException {
-
+ void beginCheckpoint() throws IOException {
LOG.info("Start checkpoint for " + checkpointFile +
", elements to sync = " + overwriteMap.size());
// Start checkpoint
elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
mappedBuffer.force();
+ }
+
+ @Override
+ void checkpoint() throws IOException {
setLogWriteOrderID(WriteOrderOracle.next());
LOG.info("Updating checkpoint metadata: logWriteOrderID: "
http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 74a2bc8..0f9456b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -101,6 +101,7 @@ final class FlumeEventQueue {
LOG.debug("Checkpoint not required");
return false;
}
+ backingStore.beginCheckpoint();
inflightPuts.serializeAndWrite();
inflightTakes.serializeAndWrite();
backingStore.checkpoint();