You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by br...@apache.org on 2013/01/08 18:01:59 UTC

git commit: FLUME-1824: Inflights can complete successfully even if checkpoint fails

Updated Branches:
  refs/heads/trunk 21c67ed59 -> 1a2e0d7a7


FLUME-1824: Inflights can complete successfully even if checkpoint fails

(Hari Shreedharan via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1a2e0d7a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1a2e0d7a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1a2e0d7a

Branch: refs/heads/trunk
Commit: 1a2e0d7a7629eb32821c780d48a2c17f4e76a59e
Parents: 21c67ed
Author: Brock Noland <br...@apache.org>
Authored: Tue Jan 8 11:01:31 2013 -0600
Committer: Brock Noland <br...@apache.org>
Committed: Tue Jan 8 11:01:31 2013 -0600

----------------------------------------------------------------------
 .../flume/channel/file/EventQueueBackingStore.java |    1 +
 .../channel/file/EventQueueBackingStoreFile.java   |    7 +++++--
 .../apache/flume/channel/file/FlumeEventQueue.java |    1 +
 3 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
index 13b50da..b136eb0 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStore.java
@@ -36,6 +36,7 @@ abstract class EventQueueBackingStore {
   }
 
 
+  abstract void beginCheckpoint() throws IOException;
   abstract void checkpoint() throws IOException;
   abstract void incrementFileID(int fileID);
   abstract void decrementFileID(int fileID);

http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
index 186b15a..7f35301 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/EventQueueBackingStoreFile.java
@@ -104,14 +104,17 @@ abstract class EventQueueBackingStoreFile extends EventQueueBackingStore {
   protected abstract void writeCheckpointMetaData() throws IOException;
 
   @Override
-  void checkpoint()  throws IOException {
-
+  void beginCheckpoint() throws IOException {
     LOG.info("Start checkpoint for " + checkpointFile +
         ", elements to sync = " + overwriteMap.size());
 
     // Start checkpoint
     elementsBuffer.put(INDEX_CHECKPOINT_MARKER, CHECKPOINT_INCOMPLETE);
     mappedBuffer.force();
+  }
+
+  @Override
+  void checkpoint()  throws IOException {
 
     setLogWriteOrderID(WriteOrderOracle.next());
     LOG.info("Updating checkpoint metadata: logWriteOrderID: "

http://git-wip-us.apache.org/repos/asf/flume/blob/1a2e0d7a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
index 74a2bc8..0f9456b 100644
--- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
+++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/FlumeEventQueue.java
@@ -101,6 +101,7 @@ final class FlumeEventQueue {
       LOG.debug("Checkpoint not required");
       return false;
     }
+    backingStore.beginCheckpoint();
     inflightPuts.serializeAndWrite();
     inflightTakes.serializeAndWrite();
     backingStore.checkpoint();