You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/12 15:53:46 UTC

git commit: S4-40 asynchronously serialize PEs

Updated Branches:
  refs/heads/S4-40 [created] 29a22ef03


S4-40 asynchronously serialize PEs

- offloading this task from the event processing thread
- race conditions prevented through a coordination mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/29a22ef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/29a22ef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/29a22ef0

Branch: refs/heads/S4-40
Commit: 29a22ef0380fec4c355583ef082cdc45711b914e
Parents: 45efb82
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Jan 12 16:53:08 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Jan 12 16:53:08 2012 +0100

----------------------------------------------------------------------
 .../org/apache/s4/ft/CheckpointingCoordinator.java |  135 ++++++++++
 .../src/main/java/org/apache/s4/ft/SafeKeeper.java |  208 +++++++++++----
 .../main/java/org/apache/s4/ft/SaveStateTask.java  |   27 ++-
 .../main/java/org/apache/s4/ft/SerializeTask.java  |   45 +++
 .../java/org/apache/s4/processor/AbstractPE.java   |   16 +-
 .../main/java/org/apache/s4/util/MetricsName.java  |  109 ++++----
 .../test/java/org/apache/s4/ft/RecoveryTest.java   |    6 +-
 7 files changed, 425 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
new file mode 100644
index 0000000..7a20ea7
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.ft;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.processor.AbstractPE;
+
+/**
+ * Prevents event processing thread and serialization thread to overlap on the same PE instance, which would cause consistency issues during recovery.
+ * 
+ * How it works:
+ * - we keep track of the PE being serialized and the PE being processed
+ * - access to the PE is guarded by the instance of this class
+ * - if the event processing thread receives an event that is handled by a PE currently being serialized, it waits until serialization is complete
+ * - this is expected to happen rarely, because there are many PEs and they don't get checkpointed all the time
+ * - there is a configurable timeout for blocking the event processing thread (if triggered, an error message is displayed)
+ *
+ */
+public class CheckpointingCoordinator {
+
+	private static final Logger logger = Logger
+			.getLogger(CheckpointingCoordinator.class);
+
+	AbstractPE processing = null;
+	AbstractPE serializing = null;
+
+	long maxSerializationLockDuration;
+
+	Lock lock = new ReentrantLock();
+	Condition processingFinished = lock.newCondition();
+	Condition serializingFinished = lock.newCondition();
+
+	public CheckpointingCoordinator(long maxSerializationLockDuration) {
+		super();
+		this.maxSerializationLockDuration = maxSerializationLockDuration;
+	}
+
+	public void acquireForProcessing(AbstractPE pe) {
+		lock.lock();
+		try {
+			if (serializing == pe) {
+				try {
+					if (logger.isTraceEnabled()) {
+						logger.trace("processing must wait for serialization to finish for PE "
+								+ pe.getId() + "/" + pe.getKeyValueString());
+					}
+					serializingFinished.await(maxSerializationLockDuration,
+							TimeUnit.MILLISECONDS);
+					acquireForProcessing(pe);
+				} catch (InterruptedException e) {
+					logger.error("Could not acquire permit for processing after timeout of ["
+							+ maxSerializationLockDuration
+							+ "] milliseconds for PE["
+							+ pe.getId()
+							+ "/"
+							+ pe.getKeyValueString()
+							+ "]\nProceeding anyway, but checkpoint may contain inconsistent value");
+					serializing = null;
+				}
+			}
+			processing = pe;
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public void releaseFromProcessing(AbstractPE pe) {
+		lock.lock();
+		try {
+			if (processing == pe) {
+				processing = null;
+				processingFinished.signal();
+			} else {
+				logger.warn("Cannot release from processing thread a PE that is not already in processing state");
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public void acquireForSerialization(AbstractPE pe) {
+		lock.lock();
+		try {
+			if (processing == pe) {
+				try {
+					if (logger.isTraceEnabled()) {
+						logger.trace("serialization must wait for processing to finish for PE "
+								+ pe.getId() + "/" + pe.getKeyValueString());
+					}
+					processingFinished.await(maxSerializationLockDuration, TimeUnit.MILLISECONDS);
+					acquireForSerialization(pe);
+				} catch (InterruptedException e) {
+					// we still need to make sure it is now safe to serialize
+					acquireForSerialization(pe);
+				}
+			}
+			serializing = pe;
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public void releaseFromSerialization(AbstractPE pe)
+			throws InterruptedException {
+		lock.lock();
+		try {
+			if (serializing == pe) {
+				serializing = null;
+				serializingFinished.signal();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
index f47fe44..aa72010 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -17,21 +17,25 @@
  */
 package org.apache.s4.ft;
 
-import org.apache.s4.dispatcher.Dispatcher;
-import org.apache.s4.dispatcher.partitioner.Hasher;
-import org.apache.s4.emitter.CommLayerEmitter;
-import org.apache.s4.processor.AbstractPE;
-import org.apache.s4.serialize.SerializerDeserializer;
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
+import org.apache.s4.dispatcher.Dispatcher;
+import org.apache.s4.dispatcher.partitioner.Hasher;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.logger.Monitor;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.MetricsName;
 
 /**
  * 
@@ -46,7 +50,7 @@ import org.apache.log4j.Logger;
  */
 public class SafeKeeper {
 
-    public enum StorageResultCode {
+    public enum StorageResultCode { 
         SUCCESS, FAILURE
     }
 
@@ -60,15 +64,29 @@ public class SafeKeeper {
     private CountDownLatch signalNodesAvailable = new CountDownLatch(1);
     private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
 
-    ThreadPoolExecutor threadPool;
-
-    int maxWriteThreads = 1;
-    int writeThreadKeepAliveSeconds = 120;
-    int maxOutstandingWriteRequests = 1000;
+    private ThreadPoolExecutor storageThreadPool;
+    private ThreadPoolExecutor serializationThreadPool;
+    
+    private CheckpointingCoordinator processingSerializationSynchro;
+    
+    private Monitor monitor;
+    
+    int storageMaxThreads = 1;
+    int storageThreadKeepAliveSeconds = 120;
+    int storageMaxOutstandingRequests = 1000;
+    
+    int serializationMaxThreads=1;
+    int serializationThreadKeepAliveSeconds = 120;
+    int serializationMaxOutstandingRequests = 1000;
+    
+    long maxSerializationLockTime = 1000;
 
     public SafeKeeper() {
     }
 
+    public void setMonitor(Monitor monitor) {
+        this.monitor = monitor;
+    }
     /**
      * <p>
      * This init() method <b>must</b> be called by the dependency injection
@@ -83,11 +101,16 @@ public class SafeKeeper {
         } catch (InterruptedException e1) {
             e1.printStackTrace();
         }
-        threadPool = new ThreadPoolExecutor(1, maxWriteThreads, writeThreadKeepAliveSeconds, TimeUnit.SECONDS,
-                new ArrayBlockingQueue<Runnable>(maxOutstandingWriteRequests));
-        logger.debug("Started thread pool with maxWriteThreads=[" + maxWriteThreads
-                + "], writeThreadKeepAliveSeconds=[" + writeThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
-                + maxOutstandingWriteRequests + "]");
+        storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests));
+        serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
+                new ArrayBlockingQueue<Runnable>(serializationMaxOutstandingRequests));
+        
+        processingSerializationSynchro = new CheckpointingCoordinator(maxSerializationLockTime);
+
+        logger.debug("Started thread pool with maxWriteThreads=[" + storageMaxThreads
+                + "], writeThreadKeepAliveSeconds=[" + storageThreadKeepAliveSeconds + "], maxOutsandingWriteRequests=["
+                + storageMaxOutstandingRequests + "]");
 
         int nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
         // required wait until nodes are available
@@ -98,36 +121,77 @@ public class SafeKeeper {
             }
             nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
         }
-
+        
         signalNodesAvailable.countDown();
     }
-
+    
     /**
-     * Forwards a call to checkpoint a PE to the backend storage.
-     * 
-     * @param key
-     *            safeKeeperId
-     * @param state
-     *            checkpoint data
+     * Synchronization to prevent race conditions with serialization threads
      */
-    public void saveState(SafeKeeperId safeKeeperId, byte[] serializedState) {
-        StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
-        try {
-            threadPool.submit(createSaveStateTask(safeKeeperId, serializedState));
-        } catch (RejectedExecutionException e) {
+    public void acquirePermitForProcessing(AbstractPE pe) {
+    	processingSerializationSynchro.acquireForProcessing(pe);
+    }
+    
+    /**
+     * Notification part of the mechanism for preventing race condition with serialization threads
+     */
+	public void releasePermitForProcessing(AbstractPE pe) {
+		processingSerializationSynchro.releaseFromProcessing(pe);
+	}
+
+
+	/**
+	 * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+	 * 
+	 * @return a callback for getting notified of the result of the storage operation
+	 */
+	public StorageCallback saveState(AbstractPE pe) {
+		StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+		Future<byte[]> futureSerializedState = null;
+		try {
+			futureSerializedState = serializeState(pe, processingSerializationSynchro);
+		} catch (RejectedExecutionException e) {
+			if (monitor!=null) {
+				monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
+			}
+			storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+					"Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["+ pe.getId()+"]" +
+							"	Remaining capacity for the serialization task queue is ["
+							+ serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+							+ serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" + serializationThreadPool
+							+ "]");
+			return storageCallback;
+		}
+		submitSaveStateTask(new SaveStateTask(pe.getSafeKeeperId(), futureSerializedState, storageCallback, stateStorage), storageCallback);
+		return storageCallback;
+	}
+    
+    private Future<byte[]> serializeState(AbstractPE pe, CheckpointingCoordinator coordinator) {
+        Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe, coordinator));
+    	if(monitor!=null) {
+            monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
+        }
+    	return future;
+    }
+    
+    private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
+    	try {
+            storageThreadPool.execute(task);
+            if (monitor!=null) {
+                monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+            }
+    	} catch (RejectedExecutionException e) {
+            if (monitor!=null) {
+                monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+            }
             storageCallback.storageOperationResult(StorageResultCode.FAILURE,
-                    "Could not submit task to persist checkpoint. Remaining capacity for task queue is ["
-                            + threadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
-                            + threadPool.getQueue().size() + "] ; maximum capacity is [" + maxOutstandingWriteRequests
+                    "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+                            + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + storageThreadPool.getQueue().size() + "] ; maximum capacity is [" + storageMaxOutstandingRequests
                             + "]");
         }
     }
 
-    private SaveStateTask createSaveStateTask(SafeKeeperId safeKeeperId, byte[] serializedState) {
-        return new SaveStateTask(safeKeeperId, serializedState, storageCallbackFactory.createStorageCallback(),
-                stateStorage);
-    }
-
     /**
      * Fetches checkpoint data from storage for a given PE
      * 
@@ -232,28 +296,62 @@ public class SafeKeeper {
         this.storageCallbackFactory = storageCallbackFactory;
     }
 
-    public int getMaxWriteThreads() {
-        return maxWriteThreads;
-    }
+	public int getStorageMaxThreads() {
+		return storageMaxThreads;
+	}
 
-    public void setMaxWriteThreads(int maxWriteThreads) {
-        this.maxWriteThreads = maxWriteThreads;
-    }
+	public void setStorageMaxThreads(int storageMaxThreads) {
+		this.storageMaxThreads = storageMaxThreads;
+	}
 
-    public int getWriteThreadKeepAliveSeconds() {
-        return writeThreadKeepAliveSeconds;
-    }
+	public int getStorageThreadKeepAliveSeconds() {
+		return storageThreadKeepAliveSeconds;
+	}
 
-    public void setWriteThreadKeepAliveSeconds(int writeThreadKeepAliveSeconds) {
-        this.writeThreadKeepAliveSeconds = writeThreadKeepAliveSeconds;
-    }
+	public void setStorageThreadKeepAliveSeconds(int storageThreadKeepAliveSeconds) {
+		this.storageThreadKeepAliveSeconds = storageThreadKeepAliveSeconds;
+	}
 
-    public int getMaxOutstandingWriteRequests() {
-        return maxOutstandingWriteRequests;
-    }
+	public int getStorageMaxOutstandingRequests() {
+		return storageMaxOutstandingRequests;
+	}
 
-    public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
-        this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
-    }
+	public void setStorageMaxOutstandingRequests(int storageMaxOutstandingRequests) {
+		this.storageMaxOutstandingRequests = storageMaxOutstandingRequests;
+	}
+
+	public int getSerializationMaxThreads() {
+		return serializationMaxThreads;
+	}
+
+	public void setSerializationMaxThreads(int serializationMaxThreads) {
+		this.serializationMaxThreads = serializationMaxThreads;
+	}
+
+	public int getSerializationThreadKeepAliveSeconds() {
+		return serializationThreadKeepAliveSeconds;
+	}
+
+	public void setSerializationThreadKeepAliveSeconds(
+			int serializationThreadKeepAliveSeconds) {
+		this.serializationThreadKeepAliveSeconds = serializationThreadKeepAliveSeconds;
+	}
+
+	public int getSerializationMaxOutstandingRequests() {
+		return serializationMaxOutstandingRequests;
+	}
+
+	public void setSerializationMaxOutstandingRequests(
+			int serializationMaxOutstandingRequests) {
+		this.serializationMaxOutstandingRequests = serializationMaxOutstandingRequests;
+	}
+
+	public long getMaxSerializationLockTime() {
+		return maxSerializationLockTime;
+	}
+
+	public void setMaxSerializationLockTime(long maxSerializationLockTime) {
+		this.maxSerializationLockTime = maxSerializationLockTime;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
index 4ed79c9..81363a1 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
@@ -17,6 +17,11 @@
  */
 package org.apache.s4.ft;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.log4j.Logger;
+
 
 /**
  * 
@@ -26,20 +31,36 @@ package org.apache.s4.ft;
 public class SaveStateTask implements Runnable {
     
     SafeKeeperId safeKeeperId;
-    byte[] state;
+    byte[] serializedState;
+    Future<byte[]> futureSerializedState = null;
     StorageCallback storageCallback;
     StateStorage stateStorage;
     
     public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
         super();
         this.safeKeeperId = safeKeeperId;
-        this.state = state;
+        this.serializedState = state;
+        this.storageCallback = storageCallback;
+        this.stateStorage = stateStorage;
+    }
+    
+    public SaveStateTask(SafeKeeperId safeKeeperId, Future<byte[]> futureSerializedState, StorageCallback storageCallback, StateStorage stateStorage) {
+    	this.safeKeeperId = safeKeeperId;
+        this.futureSerializedState = futureSerializedState;
         this.storageCallback = storageCallback;
         this.stateStorage = stateStorage;
     }
     
     @Override
     public void run() {
-        stateStorage.saveState(safeKeeperId, state, storageCallback);
+    	if (futureSerializedState!=null) {
+    		try {
+				stateStorage.saveState(safeKeeperId, futureSerializedState.get(), storageCallback);
+			} catch (InterruptedException e) {
+				Thread.currentThread().interrupt();
+			} catch (ExecutionException e) {
+				Logger.getLogger(SaveStateTask.class).warn("Cannot save checkpoint : " + safeKeeperId, e);
+			}
+    	}
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java b/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
new file mode 100644
index 0000000..2fe2d13
--- /dev/null
+++ b/s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.ft;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.processor.AbstractPE;
+
+public class SerializeTask implements Callable<byte[]> {
+
+	AbstractPE pe;
+	private CheckpointingCoordinator coordinator;
+	
+	public SerializeTask(AbstractPE pe, CheckpointingCoordinator coordinator) {
+		super();
+		this.pe = pe;
+		this.coordinator = coordinator;
+	}
+
+	@Override
+	public byte[] call() throws Exception {
+		try {
+			coordinator.acquireForSerialization(pe);
+			return pe.serializeState();
+		} finally {
+			coordinator.releaseFromSerialization(pe);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
index 58757d3..e4b7109 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -22,6 +22,7 @@ import org.apache.s4.dispatcher.partitioner.KeyInfo;
 import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
 import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
 import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
+import org.apache.s4.ft.CheckpointingCoordinator;
 import org.apache.s4.ft.InitiateCheckpointingEvent;
 import org.apache.s4.ft.RecoveryEvent;
 import org.apache.s4.ft.SafeKeeper;
@@ -42,6 +43,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -120,6 +122,7 @@ public abstract class AbstractPE implements Cloneable {
     transient private int checkpointableEventCount = 0;
     transient private int checkpointsBeforePause = -1;
     transient private long checkpointingPauseTimeInMillis;
+    
 
     transient private OverloadDispatcher overloadDispatcher;
 
@@ -206,6 +209,7 @@ public abstract class AbstractPE implements Cloneable {
         this.streamName = streamName;
 
         if (safeKeeper != null) {
+        	safeKeeper.acquirePermitForProcessing(this);
             // initialize checkpointing event flag
             this.isCheckpointingEvent = false;
             if (!recoveryAttempted) {
@@ -241,7 +245,10 @@ public abstract class AbstractPE implements Cloneable {
                     checkpoint();
                 }
             }
-
+        }
+        
+        if (safeKeeper!=null) {
+            safeKeeper.releasePermitForProcessing(this);
         }
     }
 
@@ -539,17 +546,12 @@ public abstract class AbstractPE implements Cloneable {
 
     protected void checkpoint() {
 
-        byte[] serializedState = serializeState();
         // NOTE: assumes pe id is keyvalue from the PE...
-        saveState(getSafeKeeperId(), serializedState);
+        safeKeeper.saveState(this);
         // remove dirty flag
         checkpointable = false;
     }
 
-    private void saveState(SafeKeeperId key, byte[] serializedState) {
-        safeKeeper.saveState(key, serializedState);
-    }
-
     protected void recover() {
         byte[] serializedState = null;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
index b010a29..2fae80d 100644
--- a/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
+++ b/s4-core/src/main/java/org/apache/s4/util/MetricsName.java
@@ -1,53 +1,56 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.s4.util;
-
-public enum MetricsName {
-    // metrics event name
-    S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
-            "S4::S4CoreMetrics"),
-
-    // metrics name
-    low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
-            "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
-            "lll_bad"), // exception can't be caught
-    generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
-            "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
-            "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
-            "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
-            "pec_err"), // exception can't be caught
-    pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
-            "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
-            "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
-            "pe_j_ct"), pe_error_count("pe_err");
-
-    private final String eventShortName;
-
-    private MetricsName(String eventShortName) {
-        this.eventShortName = eventShortName;
-    }
-
-    public String toString() {
-        return eventShortName;
-    }
-
-    public static void main(String[] args) {
-        System.out.println(generic_listener_msg_in_ct.toString());
-
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.util;
+
+public enum MetricsName {
+    // metrics event name
+    S4_APP_METRICS("S4::S4AppMetrics"), S4_EVENT_METRICS("S4::S4EventMetrics"), S4_CORE_METRICS(
+            "S4::S4CoreMetrics"),
+
+    // metrics name
+    low_level_listener_msg_in_ct("lll_in"), low_level_listener_msg_drop_ct(
+            "lll_dr"), low_level_listener_qsz("lll_qsz"), low_level_listener_badmsg_ct(
+            "lll_bad"), // exception can't be caught
+    generic_listener_msg_in_ct("gl_in"), pecontainer_ev_dq_ct("pec_dq"), pecontainer_ev_nq_ct(
+            "pec_nq"), pecontainer_msg_drop_ct("pec_dr"), pecontainer_qsz(
+            "pec_qsz"), pecontainer_qsz_w("pec_qsz_w"), pecontainer_ev_process_ct(
+            "pec_pr"), pecontainer_pe_ct("pec_pe"), pecontainer_ev_err_ct(
+            "pec_err"), // exception can't be caught
+    pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
+            "lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
+            "lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
+            "pe_j_ct"), pe_error_count("pe_err"), checkpointing_dropped_from_serialization_queue("cp_ser_dr"), 
+            checkpointing_dropped_from_storage_queue("cp_sto_dr"), 
+            checkpointing_added_to_serialization_queue("cp_ser_in"), 
+            checkpointing_added_to_storage_queue("cp_sto_in");
+
+	private final String eventShortName;
+
+	private MetricsName(String eventShortName) {
+		this.eventShortName = eventShortName;
+	}
+
+	public String toString() {
+		return eventShortName;
+	}
+
+	public static void main(String[] args) {
+		System.out.println(generic_listener_msg_in_ct.toString());
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/29a22ef0/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
index 1ca5652..0265ba0 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -53,7 +53,7 @@ public class RecoveryTest extends S4TestCase {
         forkedS4App = TestUtils.forkS4App(getClass().getName(),
                 "s4_core_conf_fs_backend.xml");
         // TODO synchro
-        Thread.sleep(4000);
+        Thread.sleep(5000);
 
         CountDownLatch signalValue1Set = new CountDownLatch(1);
         TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
@@ -75,13 +75,13 @@ public class RecoveryTest extends S4TestCase {
         // trigger checkpoint
         gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
                 "Stream1", 0);
-        signalCheckpointed.await();
+        signalCheckpointed.await(10, TimeUnit.SECONDS);
         // signalCheckpointAddedByBK.await();
 
         signalValue1Set = new CountDownLatch(1);
         TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
         gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
-        signalValue1Set.await();
+        signalValue1Set.await(10, TimeUnit.SECONDS);
         Assert.assertEquals("value1=message1b ; value2=",
                 TestUtils.readFile(StatefulTestPE.DATA_FILE));