You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/18 16:06:03 UTC

[1/2] git commit: Added checkpointing framework + tests - also simplified some other tests - added optional checkpointing in twitter example

Updated Branches:
  refs/heads/S4-11 [created] ab1fca5ec


Added checkpointing framework + tests
- also simplified some other tests
- added optional checkpointing in twitter example


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/ab1fca5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/ab1fca5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/ab1fca5e

Branch: refs/heads/S4-11
Commit: ab1fca5ec9bc8c60bde4e31afdc595b857dadf7f
Parents: ecbfd42
Author: Matthieu Morel <mm...@apache.org>
Authored: Wed Jul 18 18:02:55 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Wed Jul 18 18:02:55 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/base/Event.java    |   12 +
 .../java/org/apache/s4/base/util/S4RLoader.java    |    7 +-
 .../java/org/apache/s4/comm/QueueingListener.java  |    1 +
 .../java/org/apache/s4/comm/tcp/TCPEmitter.java    |    3 +-
 .../java/org/apache/s4/comm/tcp/TCPListener.java   |    3 +-
 .../java/org/apache/s4/comm/udp/UDPListener.java   |    1 +
 .../java/org/apache/s4/fixtures/CommTestUtils.java |   29 +--
 subprojects/s4-core/s4-core.gradle                 |    1 +
 .../src/main/java/org/apache/s4/core/App.java      |   18 +-
 .../java/org/apache/s4/core/DefaultCoreModule.java |    6 +
 .../src/main/java/org/apache/s4/core/Main.java     |    2 +-
 .../java/org/apache/s4/core/ProcessingElement.java |  223 +++++++++---
 .../src/main/java/org/apache/s4/core/Stream.java   |    7 +-
 .../java/org/apache/s4/core/ft/CheckpointId.java   |  124 +++++++
 .../org/apache/s4/core/ft/CheckpointingConfig.java |   55 +++
 .../apache/s4/core/ft/CheckpointingFramework.java  |   52 +++
 .../org/apache/s4/core/ft/CheckpointingTask.java   |   33 ++
 .../s4/core/ft/DefaultFileSystemStateStorage.java  |  179 +++++++++
 .../main/java/org/apache/s4/core/ft/FetchTask.java |   36 ++
 .../ft/FileSystemBackendCheckpointingModule.java   |   15 +
 .../s4/core/ft/LoggingStorageCallbackFactory.java  |   54 +++
 .../s4/core/ft/NoOpCheckpointingFramework.java     |   26 ++
 .../java/org/apache/s4/core/ft/SafeKeeper.java     |  282 +++++++++++++++
 .../java/org/apache/s4/core/ft/SaveStateTask.java  |   75 ++++
 .../java/org/apache/s4/core/ft/SerializeTask.java  |   46 +++
 .../java/org/apache/s4/core/ft/StateStorage.java   |   72 ++++
 .../org/apache/s4/core/ft/StorageCallback.java     |   37 ++
 .../apache/s4/core/ft/StorageCallbackFactory.java  |   35 ++
 .../test/java/org/apache/s4/core/TriggerTest.java  |   12 +-
 ...duleWithUnrespondingFetchingStorageBackend.java |   20 +
 .../org/apache/s4/core/ft/CheckpointingTest.java   |  153 ++++++++
 .../java/org/apache/s4/core/ft/FTWordCountApp.java |   18 +
 .../org/apache/s4/core/ft/FTWordCountTest.java     |  131 +++++++
 ...ndWithZKStorageCallbackCheckpointingModule.java |   50 +++
 .../java/org/apache/s4/core/ft/RecoveryTest.java   |  158 ++++++++
 .../core/ft/S4AppWithCountBasedCheckpointing.java  |   24 ++
 .../s4/core/ft/S4AppWithManualCheckpointing.java   |   29 ++
 .../core/ft/S4AppWithTimeBasedCheckpointing.java   |   28 ++
 .../java/org/apache/s4/core/ft/StatefulTestPE.java |   97 +++++
 .../core/ft/StorageWithUnrespondingFetching.java   |   45 +++
 .../org/apache/s4/core/triggers/TriggerablePE.java |    6 +-
 .../org/apache/s4/core/triggers/TriggeredApp.java  |   12 +-
 .../apache/s4/deploy/TestAutomaticDeployment.java  |   62 ++--
 .../java/org/apache/s4/fixtures/CoreTestUtils.java |    5 -
 .../java/org/apache/s4/fixtures/SocketAdapter.java |  101 -----
 .../org/apache/s4/wordcount/SentenceKeyFinder.java |   12 +-
 .../java/org/apache/s4/wordcount/WordCountApp.java |   32 +--
 .../org/apache/s4/wordcount/WordCountTest.java     |   62 ++--
 .../org/apache/s4/wordcount/WordSplitterPE.java    |   15 +-
 .../java/org/apache/s4/deploy/SocketAdapter.java   |   80 ----
 .../main/java/org/apache/s4/deploy/TestApp.java    |   27 +-
 .../org/apache/s4/example/twitter/TopNTopicPE.java |    4 +
 .../s4/example/twitter/TopicCountAndReportPE.java  |    8 +-
 .../s4/example/twitter/TwitterCounterApp.java      |   30 +-
 54 files changed, 2235 insertions(+), 420 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
index 7efadcc..1f5d7e4 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java
@@ -188,4 +188,16 @@ public class Event {
         }
 
     }
+
+    @Override
+    public String toString() {
+        Map<String, String> attributesAsMap = getAttributesAsMap();
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        for (Map.Entry<String, String> entry : attributesAsMap.entrySet()) {
+            sb.append("{" + entry.getKey() + ";" + entry.getValue() + "},");
+        }
+        sb.append("]");
+        return sb.toString();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
index f75a867..8d3e062 100644
--- a/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
+++ b/subprojects/s4-base/src/main/java/org/apache/s4/base/util/S4RLoader.java
@@ -18,7 +18,12 @@ public class S4RLoader extends URLClassLoader {
     }
 
     public Class<?> loadGeneratedClass(String name, byte[] bytes) {
-        return defineClass(name, bytes, 0, bytes.length);
+        Class<?> clazz = findLoadedClass(name);
+        if (clazz == null) {
+            return defineClass(name, bytes, 0, bytes.length);
+        } else {
+            return clazz;
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
index 46f0a8e..2a9dc5c 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/QueueingListener.java
@@ -46,6 +46,7 @@ public class QueueingListener implements Listener, Runnable {
             // System.out.println("QueueingListener: About to take message from queue");
             return queue.take();
         } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
index 4472910..d2aa64f 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java
@@ -132,6 +132,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
         } catch (InterruptedException ie) {
             logger.error(String.format("Interrupted while connecting to %s:%d", clusterNode.getMachineName(),
                     clusterNode.getPort()));
+            Thread.currentThread().interrupt();
         }
         return false;
     }
@@ -182,7 +183,7 @@ public class TCPEmitter implements Emitter, ClusterChangeListener {
             bootstrap.releaseExternalResources();
         } catch (InterruptedException ie) {
             logger.error("Interrupted while closing");
-            ie.printStackTrace();
+            Thread.currentThread().interrupt();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
index 3e905e6..d7d49bd 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java
@@ -80,6 +80,7 @@ public class TCPListener implements Listener {
             byte[] msg = handoffQueue.take();
             return msg;
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             return null;
         }
     }
@@ -93,7 +94,7 @@ public class TCPListener implements Listener {
         try {
             channels.close().await();
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            Thread.currentThread().interrupt();
         }
         bootstrap.releaseExternalResources();
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
index 316999c..43bb470 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java
@@ -74,6 +74,7 @@ public class UDPListener implements Listener, Runnable {
         try {
             return handoffQueue.take();
         } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index 2791da6..63e32b5 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -5,9 +5,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.io.PrintWriter;
 import java.net.InetSocketAddress;
-import java.net.ServerSocket;
 import java.net.Socket;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -41,11 +39,9 @@ public class CommTestUtils {
 
     public static final int ZK_PORT = 2181;
     public static final String ZK_STRING = "localhost:" + ZK_PORT;
-    public static final int INITIAL_BOOKIE_PORT = 5000;
     public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
     public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
             + "storage");
-    public static ServerSocket serverSocket;
     static {
         logger.info("Storage dir: " + DEFAULT_STORAGE_DIR);
     }
@@ -233,6 +229,7 @@ public class CommTestUtils {
             try {
                 Thread.sleep(250);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 // ignore
             }
         }
@@ -281,6 +278,7 @@ public class CommTestUtils {
             try {
                 Thread.sleep(250);
             } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                 // ignore
             }
         }
@@ -295,12 +293,6 @@ public class CommTestUtils {
 
     }
 
-    public static void stopSocketAdapter() throws IOException {
-        if (serverSocket != null) {
-            serverSocket.close();
-        }
-    }
-
     /**
      * gradle and eclipse have different directories for output files This is justified here
      * http://gradle.1045684.n5.nabble.com/Changing-default-IDE-output-directories-td3335478.html#a3337433
@@ -330,21 +322,4 @@ public class CommTestUtils {
 
     }
 
-    public static void injectIntoStringSocketAdapter(String string) throws IOException {
-        Socket socket = null;
-        PrintWriter writer = null;
-        try {
-            socket = new Socket("localhost", 12000);
-            writer = new PrintWriter(socket.getOutputStream(), true);
-            writer.println(string);
-        } catch (IOException e) {
-            e.printStackTrace();
-            System.exit(-1);
-        } finally {
-            if (socket != null) {
-                socket.close();
-            }
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/s4-core.gradle
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/s4-core.gradle b/subprojects/s4-core/s4-core.gradle
index ad3bbbe..c3a3f52 100644
--- a/subprojects/s4-core/s4-core.gradle
+++ b/subprojects/s4-core/s4-core.gradle
@@ -20,6 +20,7 @@ dependencies {
     compile project(":s4-base")
     compile project(":s4-comm")
     compile project(path: ':s4-comm', configuration: 'tests')
+    compile libraries.commons_codec
     compile libraries.jcommander
     testCompile project(path: ':s4-comm', configuration: 'tests')
     testCompile libraries.gradle_base_services

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 0beec16..bc20e98 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -26,7 +26,7 @@ import org.apache.s4.base.KeyFinder;
 import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.comm.serialize.KryoSerDeser;
 import org.apache.s4.comm.topology.RemoteStreams;
-import org.apache.s4.core.App.ClockType;
+import org.apache.s4.core.ft.CheckpointingFramework;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +34,10 @@ import com.google.common.collect.Maps;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
 
-/*
- * Container base class to hold all processing elements. We will implement administrative methods here.
+/**
+ * Container base class to hold all processing elements.
+ *
+ * It is also where one defines the application graph: PE prototypes, internal streams, input and output streams.
  */
 public abstract class App {
 
@@ -73,6 +75,10 @@ public abstract class App {
     @Named("cluster.name")
     String clusterName;
 
+    // default is NoOpCheckpointingFramework
+    @Inject
+    CheckpointingFramework checkpointingFramework;
+
     // serialization uses the application class loader
     private SerializerDeserializer serDeser = new KryoSerDeser(getClass().getClassLoader());
 
@@ -111,7 +117,7 @@ public abstract class App {
     }
 
     /* Should only be used within the core package. */
-    public void addStream(Streamable stream) {
+    public void addStream(Streamable<Event> stream) {
         streams.add(stream);
     }
 
@@ -262,6 +268,10 @@ public abstract class App {
         return serDeser;
     }
 
+    public CheckpointingFramework getCheckpointingFramework() {
+        return checkpointingFramework;
+    }
+
     /**
      * Creates a stream with a specific key finder. The event is delivered to the PE instances in the target PE
      * prototypes by key.

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
index b740a21..d441d59 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java
@@ -11,6 +11,8 @@ import org.apache.s4.base.SerializerDeserializer;
 import org.apache.s4.base.util.S4RLoaderFactory;
 import org.apache.s4.comm.DefaultHasher;
 import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.core.ft.CheckpointingFramework;
+import org.apache.s4.core.ft.NoOpCheckpointingFramework;
 import org.apache.s4.deploy.DeploymentManager;
 import org.apache.s4.deploy.DistributedDeploymentManager;
 import org.slf4j.Logger;
@@ -59,6 +61,10 @@ public class DefaultCoreModule extends AbstractModule {
         bind(DeploymentManager.class).to(DistributedDeploymentManager.class);
 
         bind(S4RLoaderFactory.class);
+
+        // For enabling checkpointing, one needs to use a custom module, such as
+        // org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
+        bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);
     }
 
     private void loadProperties(Binder binder) {

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 5f1375b..e01fd33 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -160,7 +160,7 @@ public class Main {
         @Parameter(names = "-appClass", description = "App class to load. This will disable dynamic downloading but allows to start apps directly. These app classes must have been loaded first, usually through a custom module.", required = false, hidden = true)
         String appClass = null;
 
-        @Parameter(names = "-extraModulesClasses", description = "additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
+        @Parameter(names = { "-extraModulesClasses", "-emc" }, description = "Additional configuration modules (they will be instantiated through their constructor without arguments).", variableArity = true, required = false, hidden = true)
         List<String> extraModulesClasses = new ArrayList<String>();
 
         @Parameter(names = { "-namedStringParameters", "-p" }, description = "Inline configuration parameters, taking precedence over homonymous configuration parameters from configuration files. Syntax: '-namedStringParameters={name1=value1},{name2=value2} '", hidden = false, converter = InlineConfigParameterConverter.class)

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
index 4dced75..10734ca 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java
@@ -1,20 +1,7 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
 package org.apache.s4.core;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Timer;
@@ -25,6 +12,10 @@ import java.util.concurrent.TimeUnit;
 import net.jcip.annotations.ThreadSafe;
 
 import org.apache.s4.base.Event;
+import org.apache.s4.core.ft.CheckpointId;
+import org.apache.s4.core.ft.CheckpointingConfig;
+import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
+import org.apache.s4.core.ft.CheckpointingTask;
 import org.apache.s4.core.gen.OverloadDispatcher;
 import org.apache.s4.core.gen.OverloadDispatcherGenerator;
 import org.slf4j.Logger;
@@ -34,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.MapMaker;
 import com.google.common.collect.Maps;
@@ -94,34 +86,41 @@ import com.google.common.collect.Maps;
  */
 public abstract class ProcessingElement implements Cloneable {
 
-    private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
-    private static final String SINGLETON = "singleton";
+    transient private static final Logger logger = LoggerFactory.getLogger(ProcessingElement.class);
+    transient private static final String SINGLETON = "singleton";
 
-    protected App app;
+    transient protected App app;
 
     /*
      * This maps holds all the instances. We make it package private to prevent concrete classes from updating the
      * collection.
      */
-    Cache<String, ProcessingElement> peInstances;
+    transient Cache<String, ProcessingElement> peInstances;
 
     /* This map is initialized in the prototype and cloned to instances. */
-    Map<Class<? extends Event>, Trigger> triggers;
+    transient Map<Class<? extends Event>, Trigger> triggers;
 
     /* PE instance id. */
     String id = "";
 
     /* Private fields. */
-    private ProcessingElement pePrototype;
-    private boolean haveTriggers = false;
-    private long timerIntervalInMilliseconds = 0;
-    private Timer timer;
-    private boolean isPrototype = true;
-    private boolean isThreadSafe = false;
-    private String name = null;
-    private boolean isSingleton = false;
-
-    private transient OverloadDispatcher overloadDispatcher;
+    transient private ProcessingElement pePrototype;
+    transient private boolean haveTriggers = false;
+    transient private long timerIntervalInMilliseconds = 0;
+    transient private Timer triggerTimer;
+    transient private Timer checkpointingTimer;
+    transient private boolean isPrototype = true;
+    transient private boolean isThreadSafe = false;
+    transient private String name = null;
+    transient private boolean isSingleton = false;
+    transient long eventCount = 0;
+
+    transient private OverloadDispatcher overloadDispatcher;
+    transient private boolean recoveryAttempted = false;
+    transient private boolean dirty = false;
+
+    transient private CheckpointingConfig checkpointingConfig = new CheckpointingConfig.Builder(CheckpointingMode.NONE)
+            .build();
 
     protected ProcessingElement() {
         OverloadDispatcherGenerator oldg = new OverloadDispatcherGenerator(this.getClass());
@@ -211,7 +210,7 @@ public abstract class ProcessingElement implements Cloneable {
         return peInstances.size();
     }
 
-    Map<String, ProcessingElement> getPEInstances() {
+    public Map<String, ProcessingElement> getPEInstances() {
         return peInstances.asMap();
     }
 
@@ -314,8 +313,9 @@ public abstract class ProcessingElement implements Cloneable {
         /* Skip trigger checking overhead if there are no triggers. */
         haveTriggers = true;
 
-        if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS)
+        if (timeUnit != null && timeUnit != TimeUnit.MILLISECONDS) {
             interval = timeUnit.convert(interval, TimeUnit.MILLISECONDS);
+        }
 
         Trigger config = new Trigger(numEvents, interval);
 
@@ -374,14 +374,14 @@ public abstract class ProcessingElement implements Cloneable {
 
         Preconditions.checkArgument(isPrototype, "This method can only be used on the PE prototype. Trigger not set.");
 
-        if (timer != null) {
-            timer.cancel();
+        if (triggerTimer != null) {
+            triggerTimer.cancel();
         }
 
         if (interval == 0)
             return this;
 
-        timer = new Timer();
+        triggerTimer = new Timer();
         return this;
     }
 
@@ -406,8 +406,11 @@ public abstract class ProcessingElement implements Cloneable {
         } else {
             object = this;
         }
-
         synchronized (object) {
+            if (!recoveryAttempted) {
+                recover();
+                recoveryAttempted = true;
+            }
 
             /* Dispatch onEvent() method. */
             overloadDispatcher.dispatchEvent(this, event);
@@ -416,9 +419,28 @@ public abstract class ProcessingElement implements Cloneable {
             if (haveTriggers && isTrigger(event)) {
                 overloadDispatcher.dispatchTrigger(this, event);
             }
+
+            eventCount++;
+
+            setDirty(true);
+
+            if (isCheckpointable()) {
+                checkpoint();
+            }
         }
     }
 
+    protected boolean isCheckpointable() {
+        return getApp().checkpointingFramework.isCheckpointable(this);
+    }
+
+    public void checkpoint() {
+
+        getApp().getCheckpointingFramework().saveState(this);
+        // remove dirty flag
+        dirty = false;
+    }
+
     private boolean isTrigger(Event event) {
         return isTrigger(event, event.getClass());
     }
@@ -466,8 +488,8 @@ public abstract class ProcessingElement implements Cloneable {
     protected void removeAll() {
 
         /* Close resources in prototype. */
-        if (timer != null) {
-            timer.cancel();
+        if (triggerTimer != null) {
+            triggerTimer.cancel();
             logger.info("Timer stopped.");
         }
 
@@ -503,12 +525,22 @@ public abstract class ProcessingElement implements Cloneable {
         }
 
         /* Start timer. */
-        if (timer != null) {
-            timer.schedule(new OnTimeTask(), 0, timerIntervalInMilliseconds);
-            logger.debug("Started timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
+        if (triggerTimer != null) {
+            triggerTimer
+                    .scheduleAtFixedRate(new OnTimeTask(), timerIntervalInMilliseconds, timerIntervalInMilliseconds);
+            logger.debug("Started trigger timer for PE prototype [{}], ID [{}] with interval [{}].", new String[] {
                     this.getClass().getName(), id, String.valueOf(timerIntervalInMilliseconds) });
         }
 
+        if (checkpointingConfig.mode == CheckpointingMode.TIME) {
+            checkpointingTimer = new Timer();
+            checkpointingTimer.scheduleAtFixedRate(new CheckpointingTask(this),
+                    checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency),
+                    checkpointingConfig.timeUnit.toMillis(checkpointingConfig.frequency));
+            logger.debug("Started checkpointing timer for PE prototype [{}], ID [{}] with interval [{}].",
+                    new String[] { this.getClass().getName(), id, String.valueOf(checkpointingConfig.frequency) });
+        }
+
         /* Check if this PE is annotated as thread safe. */
         if (getClass().isAnnotationPresent(ThreadSafe.class) == true) {
 
@@ -535,7 +567,7 @@ public abstract class ProcessingElement implements Cloneable {
             }
             return peInstances.get(id);
         } catch (ExecutionException e) {
-            logger.error("Problem when trying to create a PE instance.", e);
+            logger.error("Problem when trying to create a PE instance for id {}", id, e);
         }
         return null;
     }
@@ -544,8 +576,16 @@ public abstract class ProcessingElement implements Cloneable {
      * Get all the local instances. See notes in {@link #getInstanceForKey(String) getLocalInstanceForKey}
      */
     public Collection<ProcessingElement> getInstances() {
-
-        return peInstances.asMap().values();
+        try {
+            if (isSingleton) {
+                return ImmutableList.of(peInstances.get(SINGLETON));
+            } else {
+                return peInstances.asMap().values();
+            }
+        } catch (ExecutionException e) {
+            logger.error("Problem when trying to create a PE instance for id {}", id, e);
+            return null;
+        }
     }
 
     /**
@@ -672,6 +712,64 @@ public abstract class ProcessingElement implements Cloneable {
         app.peByName.put(name, this);
     }
 
+    public byte[] serializeState() {
+        return getApp().getSerDeser().serialize(this);
+    }
+
+    public ProcessingElement deserializeState(byte[] loadedState) {
+        return (ProcessingElement) getApp().getSerDeser().deserialize(loadedState);
+    }
+
+    public void restoreState(ProcessingElement oldState) {
+        restoreFieldsForClass(oldState.getClass(), oldState);
+    }
+
+    protected void recover() {
+        byte[] serializedState = null;
+        try {
+            serializedState = getApp().getCheckpointingFramework().fetchSerializedState(new CheckpointId(this));
+        } catch (RuntimeException e) {
+            logger.error("Cannot fetch serialized stated for [{}/{}]: {}", new String[] {
+                    getPrototype().getClass().getName(), getId(), e.getMessage() });
+        }
+        if (serializedState == null) {
+            return;
+        }
+        try {
+            ProcessingElement peInOldState = deserializeState(serializedState);
+            restoreState(peInOldState);
+        } catch (RuntimeException e) {
+            logger.error("Cannot restore state for key [" + new CheckpointId(this) + "]: " + e.getMessage(), e);
+        }
+    }
+
+    private void restoreFieldsForClass(Class<?> currentInOldStateClassHierarchy, ProcessingElement oldState) {
+        if (!ProcessingElement.class.isAssignableFrom(currentInOldStateClassHierarchy)) {
+            return;
+        } else {
+            Field[] fields = oldState.getClass().getDeclaredFields();
+            for (Field field : fields) {
+                if (!Modifier.isTransient(field.getModifiers()) && !Modifier.isStatic(field.getModifiers())) {
+                    if (!Modifier.isPublic(field.getModifiers())) {
+                        field.setAccessible(true);
+                    }
+                    try {
+                        // TODO use reflectasm
+                        field.set(this, field.get(oldState));
+                    } catch (IllegalArgumentException e) {
+                        logger.error("Cannot recover old state for this PE [{}]", e);
+                        return;
+                    } catch (IllegalAccessException e) {
+                        logger.error("Cannot recover old state for this PE [{}]", e);
+                        return;
+                    }
+
+                }
+            }
+            restoreFieldsForClass(currentInOldStateClassHierarchy.getSuperclass(), oldState);
+        }
+    }
+
     class Trigger {
         final long intervalInMilliseconds;
         final int intervalInEvents;
@@ -709,4 +807,41 @@ public abstract class ProcessingElement implements Cloneable {
             return active;
         }
     }
+
+    public long getEventCount() {
+        return eventCount;
+    }
+
+    public boolean isDirty() {
+        return dirty;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder(getClass().getName() + "/" + getId() + " ;");
+        if (isSingleton) {
+            sb.append("singleton ;");
+        }
+        sb.append(isThreadSafe ? "IS thread-safe ;" : "Not thread-safe ;");
+        sb.append("timerInterval=" + timerIntervalInMilliseconds + " ;");
+        return sb.toString();
+
+    }
+
+    public CheckpointingConfig getCheckpointingConfig() {
+        return checkpointingConfig;
+    }
+
+    public void setCheckpointingConfig(CheckpointingConfig checkpointingConfig) {
+        this.checkpointingConfig = checkpointingConfig;
+    }
+
+    /**
+     * By default, the state of the PE instance is considered dirty whenever it processed an event. Some event may
+     * actually leave the state of the PE unchanged. The dirty() method can therefore be overriden to accommodate user
+     * specific behaviors.
+     */
+    public void setDirty(boolean dirty) {
+        this.dirty = dirty;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
index e01191b..b76102a 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java
@@ -187,9 +187,8 @@ public class Stream<T extends Event> implements Runnable, Streamable {
                         .serialize(event)));
             }
         } catch (InterruptedException e) {
-            e.printStackTrace();
             logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            System.exit(-1);
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -201,9 +200,8 @@ public class Stream<T extends Event> implements Runnable, Streamable {
         try {
             queue.put(event);
         } catch (InterruptedException e) {
-            e.printStackTrace();
             logger.error("Interrupted while waiting to put an event in the queue: {}.", e.getMessage());
-            System.exit(-1);
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -305,6 +303,7 @@ public class Stream<T extends Event> implements Runnable, Streamable {
             } catch (InterruptedException e) {
                 logger.info("Closing stream {}.", name);
                 receiver.removeStream(this);
+                Thread.currentThread().interrupt();
                 return;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
new file mode 100644
index 0000000..290d46a
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointId.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * <p>
+ * Identifier of PEs. It is used to identify checkpointed PEs in the storage backend.
+ * </p>
+ * <p>
+ * The storage backend is responsible for converting this identifier to whatever internal representation is most
+ * suitable for it.
+ * </p>
+ * <p>
+ * This class provides methods for getting a compact String representation of the identifier and for creating an
+ * identifier from a String representation.
+ * </p>
+ *
+ */
+public class CheckpointId {
+
+    private String prototypeId;
+    private String key;
+
+    private static final Pattern STRING_REPRESENTATION_PATTERN = Pattern.compile("\\[(\\S*)\\];\\[(\\S*)\\]");
+
+    public CheckpointId() {
+    }
+
+    public CheckpointId(ProcessingElement pe) {
+        this.prototypeId = pe.getPrototype().getClass().getName();
+        this.key = pe.getId();
+    }
+
+    /**
+     *
+     * @param prototypeID
+     *            id of the PE as returned by {@link ProcessingElement#getId() getId()} method
+     * @param key
+     *            keyed attribute(s)
+     */
+    public CheckpointId(String prototypeID, String key) {
+        super();
+        this.prototypeId = prototypeID;
+        this.key = key;
+    }
+
+    public CheckpointId(String keyAsString) {
+        Matcher matcher = STRING_REPRESENTATION_PATTERN.matcher(keyAsString);
+
+        try {
+            matcher.find();
+            prototypeId = "".equals(matcher.group(1)) ? null : matcher.group(1);
+            key = "".equals(matcher.group(2)) ? null : matcher.group(2);
+        } catch (IndexOutOfBoundsException e) {
+
+        }
+
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public String getPrototypeId() {
+        return prototypeId;
+    }
+
+    public String toString() {
+        return "[PROTO_ID];[KEY] --> " + getStringRepresentation();
+    }
+
+    public String getStringRepresentation() {
+        return "[" + (prototypeId == null ? "" : prototypeId) + "];[" + (key == null ? "" : key) + "]";
+    }
+
+    @Override
+    public int hashCode() {
+        return getStringRepresentation().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if ((obj == null) || (getClass() != obj.getClass())) {
+            return false;
+        }
+
+        CheckpointId other = (CheckpointId) obj;
+        if (key == null) {
+            if (other.key != null)
+                return false;
+        } else if (!key.equals(other.key))
+            return false;
+        if (prototypeId == null) {
+            if (other.prototypeId != null)
+                return false;
+        } else if (!prototypeId.equals(other.prototypeId))
+            return false;
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
new file mode 100644
index 0000000..eec765d
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingConfig.java
@@ -0,0 +1,55 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Checkpointing configuration: event count based vs time interval, frequency. User the {@link Builder} class to build
+ * instances.
+ *
+ */
+public class CheckpointingConfig {
+
+    /**
+     * Identifies the kind of checkpointing: time based, event count, or no checkpointing
+     *
+     */
+    public static enum CheckpointingMode {
+        TIME, EVENT_COUNT, NONE
+    }
+
+    public final CheckpointingMode mode;
+    public final int frequency;
+    public final TimeUnit timeUnit;
+
+    private CheckpointingConfig(CheckpointingMode mode, int frequency, TimeUnit timeUnit) {
+        this.mode = mode;
+        this.frequency = frequency;
+        this.timeUnit = timeUnit;
+    }
+
+    public static class Builder {
+        private CheckpointingMode mode;
+        private int frequency;
+        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
+
+        public Builder(CheckpointingMode mode) {
+            this.mode = mode;
+        }
+
+        public Builder frequency(int frequency) {
+            this.frequency = frequency;
+            return this;
+        }
+
+        public Builder timeUnit(TimeUnit timeUnit) {
+            this.timeUnit = timeUnit;
+            return this;
+        }
+
+        public CheckpointingConfig build() {
+            return new CheckpointingConfig(mode, frequency, timeUnit);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
new file mode 100644
index 0000000..48b4616
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingFramework.java
@@ -0,0 +1,52 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+import com.google.inject.ImplementedBy;
+
+/**
+ *
+ * This interface defines the functionalities offered by the checkpointing framework.
+ *
+ */
+@ImplementedBy(value = NoOpCheckpointingFramework.class)
+public interface CheckpointingFramework {
+
+    /**
+     * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+     *
+     * @return a callback for getting notified of the result of the storage operation
+     */
+    StorageCallback saveState(ProcessingElement pe);
+
+    /**
+     * Fetches checkpoint data from storage for a given PE
+     *
+     * @param key
+     *            safeKeeperId
+     * @return checkpoint data
+     */
+    byte[] fetchSerializedState(CheckpointId key);
+
+    /**
+     * Evaluates whether specified PE should be checkpointed, based on:
+     * <ul>
+     * <li>whether checkpointing enabled</li>
+     * <li>whether the pe is "dirty"</li>
+     * <li>the checkpointing frequency settings</li>
+     * </ul>
+     *
+     * This is used for count-based checkpointing intervals. Time-based checkpointing relies on the dirty flag when
+     * triggered.
+     *
+     * @param pe
+     *            processing element to evaluate
+     * @return true if checkpointable, according to the above requirements
+     */
+    boolean isCheckpointable(ProcessingElement pe);
+
+    public enum StorageResultCode {
+        SUCCESS, FAILURE
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
new file mode 100644
index 0000000..dd6003c
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/CheckpointingTask.java
@@ -0,0 +1,33 @@
+package org.apache.s4.core.ft;
+
+import java.util.Map;
+import java.util.TimerTask;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * When checkpointing at regular time intervals, this class is used to actually perform the checkpoints. It iterates
+ * among all instances of the specified prototype, and checkpoints every eligible instance.
+ *
+ */
+public class CheckpointingTask extends TimerTask {
+
+    ProcessingElement prototype;
+
+    public CheckpointingTask(ProcessingElement prototype) {
+        super();
+        this.prototype = prototype;
+    }
+
+    @Override
+    public void run() {
+        Map<String, ProcessingElement> peInstances = prototype.getPEInstances();
+        for (Map.Entry<String, ProcessingElement> entry : peInstances.entrySet()) {
+            synchronized (entry.getValue()) {
+                if (entry.getValue().isDirty()) {
+                    entry.getValue().checkpoint();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
new file mode 100644
index 0000000..3a95da6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/DefaultFileSystemStateStorage.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.codec.binary.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Files;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * <p>
+ * Implementation of a file system backend storage to persist checkpoints.
+ * </p>
+ * <p>
+ * The file system may be the default local file system when running on a single machine, but should be a distributed
+ * file system such as NFS when running on a cluster.
+ * </p>
+ * <p>
+ * Checkpoints are stored in individual files (1 file = 1 safeKeeperId) in directories according to the following
+ * structure: <code>(storageRootpath)/prototypeId/safeKeeperId</code>
+ * </p>
+ *
+ */
+public class DefaultFileSystemStateStorage implements StateStorage {
+
+    private static Logger logger = LoggerFactory.getLogger(DefaultFileSystemStateStorage.class);
+    @Inject(optional = true)
+    @Named("s4.checkpointing.filesystem.storageRootPath")
+    String storageRootPath;
+
+    public DefaultFileSystemStateStorage() {
+    }
+
+    /**
+     * <p>
+     * Called by the dependency injection framework, after construction.
+     * <p/>
+     */
+    @Inject
+    public void init() {
+        checkStorageDir();
+    }
+
+    @Override
+    public byte[] fetchState(CheckpointId key) {
+        File file = safeKeeperID2File(key, storageRootPath);
+        if (file != null && file.exists()) {
+            logger.debug("Fetching " + file.getAbsolutePath() + "for : " + key);
+
+            try {
+                return Files.toByteArray(file);
+            } catch (IOException e) {
+                logger.error("Cannot read content from checkpoint file [" + file.getAbsolutePath() + "]", e);
+                return null;
+            }
+        } else {
+            return null;
+        }
+
+    }
+
+    @Override
+    public Set<CheckpointId> fetchStoredKeys() {
+        Set<CheckpointId> keys = new HashSet<CheckpointId>();
+        File rootDir = new File(storageRootPath);
+        File[] dirs = rootDir.listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File file) {
+                return file.isDirectory();
+            }
+        });
+        for (File dir : dirs) {
+            File[] files = dir.listFiles(new FileFilter() {
+                @Override
+                public boolean accept(File file) {
+                    return (file.isFile());
+                }
+            });
+            for (File file : files) {
+                keys.add(file2SafeKeeperID(file));
+            }
+        }
+        return keys;
+    }
+
+    // files kept as : root/<prototypeId>/encodedKeyWithFullInfo
+    private static File safeKeeperID2File(CheckpointId key, String storageRootPath) {
+
+        return new File(storageRootPath + File.separator + key.getPrototypeId() + File.separator
+                + Base64.encodeBase64URLSafeString(key.getStringRepresentation().getBytes()));
+    }
+
+    private static CheckpointId file2SafeKeeperID(File file) {
+        CheckpointId id = null;
+        id = new CheckpointId(new String(Base64.decodeBase64(file.getName())));
+        return id;
+    }
+
+    public void checkStorageDir() {
+        if (storageRootPath == null) {
+
+            File defaultStorageDir = new File(System.getProperty("user.dir") + File.separator + "tmp" + File.separator
+                    + "storage");
+            storageRootPath = defaultStorageDir.getAbsolutePath();
+            logger.warn("Unspecified storage dir; using default dir: {}", defaultStorageDir.getAbsolutePath());
+            if (!defaultStorageDir.exists()) {
+                if (!(defaultStorageDir.mkdirs())) {
+                    logger.error("Storage directory not specified, and cannot create default storage directory : "
+                            + defaultStorageDir.getAbsolutePath() + "\n Checkpointing and recovery will be disabled.");
+                }
+            }
+        }
+    }
+
+    @Override
+    public void saveState(CheckpointId key, byte[] state, StorageCallback callback) {
+        File f = safeKeeperID2File(key, storageRootPath);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Checkpointing [" + key + "] into file: [" + f.getAbsolutePath() + "]");
+        }
+        if (!f.exists()) {
+            if (!f.getParentFile().exists()) {
+                // parent file has prototype id
+                if (!f.getParentFile().mkdirs()) {
+                    callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+                            "Cannot create directory for storing PE [" + key.toString() + "] for prototype: "
+                                    + f.getParentFile().getAbsolutePath());
+                    return;
+                }
+            }
+        } else {
+            if (!f.delete()) {
+                callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE,
+                        "Cannot delete previously saved checkpoint file [" + f.getParentFile().getAbsolutePath() + "]");
+                return;
+            }
+        }
+
+        try {
+            Files.write(state, f);
+            callback.storageOperationResult(CheckpointingFramework.StorageResultCode.SUCCESS, key.toString());
+        } catch (FileNotFoundException e) {
+            logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+            callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+                    + e.getMessage());
+        } catch (IOException e) {
+            logger.error("Cannot write checkpoint file [" + f.getAbsolutePath() + "]", e);
+            callback.storageOperationResult(CheckpointingFramework.StorageResultCode.FAILURE, key.toString() + " : "
+                    + e.getMessage());
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
new file mode 100644
index 0000000..6994acd
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FetchTask.java
@@ -0,0 +1,36 @@
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates a checkpoint fetching operation.
+ *
+ */
+public class FetchTask implements Callable<byte[]> {
+
+    private static Logger logger = LoggerFactory.getLogger(FetchTask.class);
+
+    StateStorage stateStorage;
+    CheckpointId checkpointId;
+
+    public FetchTask(StateStorage stateStorage, CheckpointId checkpointId) {
+        super();
+        this.stateStorage = stateStorage;
+        this.checkpointId = checkpointId;
+    }
+
+    @Override
+    public byte[] call() throws Exception {
+        try {
+            byte[] result = stateStorage.fetchState(checkpointId);
+            return result;
+        } catch (Exception e) {
+            logger.error("Cannot fetch checkpoint data for {}", checkpointId, e);
+            throw e;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
new file mode 100644
index 0000000..dd984e6
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
@@ -0,0 +1,15 @@
+package org.apache.s4.core.ft;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * Checkpointing module that uses the {@link DefaultFileSystemStateStorage} as a checkpointing backend.
+ *
+ */
+public class FileSystemBackendCheckpointingModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        bind(StateStorage.class).to(DefaultFileSystemStateStorage.class);
+        bind(CheckpointingFramework.class).to(SafeKeeper.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
new file mode 100644
index 0000000..28279ba
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/LoggingStorageCallbackFactory.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.core.ft.CheckpointingFramework.StorageResultCode;
+
+/**
+ * A factory for creating storage callbacks that simply log callback results
+ *
+ */
+public class LoggingStorageCallbackFactory implements StorageCallbackFactory {
+
+    @Override
+    public StorageCallback createStorageCallback() {
+        return new StorageCallbackLogger();
+    }
+
+    /**
+     * A basic storage callback that simply logs results from storage operations
+     *
+     */
+    static class StorageCallbackLogger implements StorageCallback {
+
+        private static Logger logger = Logger.getLogger("s4-ft");
+
+        @Override
+        public void storageOperationResult(StorageResultCode code, Object message) {
+            if (StorageResultCode.SUCCESS == code) {
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Callback from storage: " + StorageResultCode.SUCCESS.name() + " : " + message);
+                }
+            } else {
+                logger.warn("Callback from storage: " + StorageResultCode.FAILURE.name() + " : " + message);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
new file mode 100644
index 0000000..10387ce
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/NoOpCheckpointingFramework.java
@@ -0,0 +1,26 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Implementation of {@link CheckpointingFramework} that does NO checkpointing.
+ *
+ */
+public final class NoOpCheckpointingFramework implements CheckpointingFramework {
+
+    @Override
+    public StorageCallback saveState(ProcessingElement pe) {
+        return null;
+    }
+
+    @Override
+    public byte[] fetchSerializedState(CheckpointId key) {
+        return null;
+    }
+
+    @Override
+    public boolean isCheckpointable(ProcessingElement pe) {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
new file mode 100644
index 0000000..4c8358f
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java
@@ -0,0 +1,282 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.s4.core.ProcessingElement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+/**
+ * 
+ * <p>
+ * This class is responsible for coordinating interactions between the S4 event processor and the checkpoint storage
+ * backend. In particular, it schedules asynchronous save tasks to be executed on the backend.
+ * </p>
+ * 
+ * 
+ * 
+ */
+public final class SafeKeeper implements CheckpointingFramework {
+
+    private static final class UncaughtExceptionLogger implements UncaughtExceptionHandler {
+        String operationType;
+
+        public UncaughtExceptionLogger(String operationType) {
+            this.operationType = operationType;
+        }
+
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            logger.error("Cannot execute checkpointing " + operationType + " operation", e);
+        }
+    }
+
+    private static Logger logger = LoggerFactory.getLogger(SafeKeeper.class);
+
+    @Inject
+    private StateStorage stateStorage;
+    @Inject(optional = true)
+    private StorageCallbackFactory storageCallbackFactory = new LoggingStorageCallbackFactory();
+
+    private ThreadPoolExecutor storageThreadPool;
+    private ThreadPoolExecutor serializationThreadPool;
+    private ThreadPoolExecutor fetchingThreadPool;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.storageMaxThreads")
+    int storageMaxThreads = 1;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.storageThreadKeepAliveSeconds")
+    int storageThreadKeepAliveSeconds = 120;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.storageMaxOutstandingRequests")
+    int storageMaxOutstandingRequests = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.serializationMaxThreads")
+    int serializationMaxThreads = 1;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.serializationThreadKeepAliveSeconds")
+    int serializationThreadKeepAliveSeconds = 120;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.serializationMaxOutstandingRequests")
+    int serializationMaxOutstandingRequests = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.maxSerializationLockTime")
+    long maxSerializationLockTime = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingMaxThreads")
+    int fetchingMaxThreads = 1;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingThreadKeepAliveSeconds")
+    int fetchingThreadKeepAliveSeconds = 120;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingMaxWaitMs")
+    long fetchingMaxWaitMs = 1000;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingMaxConsecutiveFailuresBeforeDisabling")
+    int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
+
+    @Inject(optional = true)
+    @Named("s4.checkpointing.fetchingDisabledDurationMs")
+    long fetchingDisabledDurationMs = 600000;
+
+    long fetchingDisabledInitTime = -1;
+    AtomicInteger fetchingCurrentConsecutiveFailures = new AtomicInteger();
+
+    public SafeKeeper() {
+    }
+
+    @Inject
+    private void init() {
+
+        // NOTE: those thread pools should be fine tuned according to backend and application load/requirements.
+        // For now:
+        // - number of threads and work queue size have overridable defaults
+        // - failures are logged
+        // - when storage queue is full, we throttle backwards to the serialization threadpool
+        // - when serialization queue is full, we abort execution for new entries
+        // - fetching uses a synchronous queue and therefore is a blocking operation, with a timeout
+
+        ThreadFactory storageThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-storage-%d")
+                .setUncaughtExceptionHandler(new UncaughtExceptionLogger("storage")).build();
+        storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds,
+                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests),
+                storageThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
+        storageThreadPool.allowCoreThreadTimeOut(true);
+
+        ThreadFactory serializationThreadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("Checkpointing-serialization-%d")
+                .setUncaughtExceptionHandler(new UncaughtExceptionLogger("serialization")).build();
+        serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads,
+                serializationThreadKeepAliveSeconds, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(
+                        serializationMaxOutstandingRequests), serializationThreadFactory,
+                new ThreadPoolExecutor.AbortPolicy());
+        serializationThreadPool.allowCoreThreadTimeOut(true);
+
+        ThreadFactory fetchingThreadFactory = new ThreadFactoryBuilder().setNameFormat("Checkpointing-fetching-%d")
+                .setUncaughtExceptionHandler(new UncaughtExceptionLogger("fetching")).build();
+        fetchingThreadPool = new ThreadPoolExecutor(1, fetchingMaxThreads, fetchingThreadKeepAliveSeconds,
+                TimeUnit.SECONDS, new SynchronousQueue<Runnable>(true), fetchingThreadFactory);
+        fetchingThreadPool.allowCoreThreadTimeOut(true);
+
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.ft.CheckpointingFramework#saveState(org.apache.s4.core.ProcessingElement)
+     */
+    @Override
+    public StorageCallback saveState(ProcessingElement pe) {
+        StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+        Future<byte[]> futureSerializedState = null;
+        try {
+            futureSerializedState = serializeState(pe);
+        } catch (RejectedExecutionException e) {
+            // if (monitor != null) {
+            // monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1,
+            // S4_CORE_METRICS.toString());
+            // }
+            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+                    "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["
+                            + pe.getId() + "]" + "	Remaining capacity for the serialization task queue is ["
+                            + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + serializationThreadPool.getQueue().size() + "] ; maximum capacity is ["
+                            + serializationThreadPool + "]");
+            return storageCallback;
+        }
+        submitSaveStateTask(new SaveStateTask(new CheckpointId(pe), futureSerializedState, storageCallback,
+                stateStorage), storageCallback);
+        return storageCallback;
+    }
+
+    private Future<byte[]> serializeState(ProcessingElement pe) {
+        Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe));
+        // if (monitor != null) {
+        // monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1,
+        // S4_CORE_METRICS.toString());
+        // }
+        return future;
+    }
+
+    private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
+        try {
+            storageThreadPool.execute(task);
+            // if (monitor != null) {
+            // monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+            // }
+        } catch (RejectedExecutionException e) {
+            // if (monitor != null) {
+            // monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+            // }
+            storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+                    "Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
+                            + storageThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+                            + storageThreadPool.getQueue().size() + "] ; maximum capacity is ["
+                            + storageMaxOutstandingRequests + "]");
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     * 
+     * @see org.apache.s4.core.ft.CheckpointingFramework#fetchSerializedState(org.apache.s4.core.ft.SafeKeeperId)
+     */
+    @Override
+    public byte[] fetchSerializedState(CheckpointId key) {
+
+        byte[] result = null;
+
+        if (fetchingCurrentConsecutiveFailures.get() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+            if ((fetchingDisabledInitTime + fetchingDisabledDurationMs) < System.currentTimeMillis()) {
+                return null;
+            } else {
+                // reached time, reinit
+                fetchingCurrentConsecutiveFailures.set(0);
+            }
+        }
+        Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, key));
+        try {
+            result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+            fetchingCurrentConsecutiveFailures.set(0);
+            return result;
+        } catch (TimeoutException te) {
+            logger.error("Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms",
+                    key.getStringRepresentation(), fetchingMaxWaitMs);
+        } catch (InterruptedException e) {
+            logger.error(
+                    "Cannot fetch checkpoint from backend for key [{}] before timeout of {} ms because of an interruption",
+                    key.getStringRepresentation(), fetchingMaxWaitMs);
+            Thread.currentThread().interrupt();
+        } catch (ExecutionException e) {
+            logger.error("Cannot fetch checkpoint from backend for key [{}] due to {}", key.getStringRepresentation(),
+                    e.getCause().getClass().getName() + "/" + e.getCause().getMessage());
+        }
+        if (fetchingCurrentConsecutiveFailures.incrementAndGet() == fetchingMaxConsecutiveFailuresBeforeDisabling) {
+            logger.trace(
+                    "Due to {} successive checkpoint fetching failures, fetching is temporarily disabled for {} ms",
+                    fetchingMaxConsecutiveFailuresBeforeDisabling, fetchingDisabledDurationMs);
+            fetchingDisabledInitTime = System.currentTimeMillis();
+        }
+
+        return result;
+    }
+
+    @Override
+    public boolean isCheckpointable(ProcessingElement pe) {
+        if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.NONE)) {
+            return false;
+        }
+        if (pe.getCheckpointingConfig().frequency > 0 && pe.isDirty()) {
+            if (pe.getCheckpointingConfig().mode.equals(CheckpointingConfig.CheckpointingMode.EVENT_COUNT)) {
+                if (pe.getEventCount() % pe.getCheckpointingConfig().frequency == 0) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
new file mode 100644
index 0000000..11c5af3
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SaveStateTask.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
+ *
+ */
+public class SaveStateTask implements Runnable {
+
+    private static Logger logger = LoggerFactory.getLogger(SaveStateTask.class);
+
+    CheckpointId safeKeeperId;
+    byte[] serializedState;
+    Future<byte[]> futureSerializedState = null;
+    StorageCallback storageCallback;
+    StateStorage stateStorage;
+
+    public SaveStateTask(CheckpointId safeKeeperId, byte[] state, StorageCallback storageCallback,
+            StateStorage stateStorage) {
+        super();
+        this.safeKeeperId = safeKeeperId;
+        this.serializedState = state;
+        this.storageCallback = storageCallback;
+        this.stateStorage = stateStorage;
+    }
+
+    public SaveStateTask(CheckpointId safeKeeperId, Future<byte[]> futureSerializedState,
+            StorageCallback storageCallback, StateStorage stateStorage) {
+        this.safeKeeperId = safeKeeperId;
+        this.futureSerializedState = futureSerializedState;
+        this.storageCallback = storageCallback;
+        this.stateStorage = stateStorage;
+    }
+
+    @Override
+    public void run() {
+        if (futureSerializedState != null) {
+            try {
+                stateStorage.saveState(safeKeeperId, futureSerializedState.get(1000, TimeUnit.MILLISECONDS),
+                        storageCallback);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } catch (ExecutionException e) {
+                logger.warn("Cannot save checkpoint : " + safeKeeperId, e);
+            } catch (TimeoutException e) {
+                logger.warn("Cannot save checkpoint {} : could not serialize before timeout", safeKeeperId);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
new file mode 100644
index 0000000..97619b5
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SerializeTask.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.core.ProcessingElement;
+
+/**
+ * Encaspulate a PE serialization operation. This operation locks the PE instance in order to avoid any inconsistent
+ * serialized state. If serialization is successful, the PE is marked as "not dirty".
+ *
+ */
+public class SerializeTask implements Callable<byte[]> {
+
+    ProcessingElement pe;
+
+    public SerializeTask(ProcessingElement pe) {
+        super();
+        this.pe = pe;
+    }
+
+    @Override
+    public byte[] call() throws Exception {
+        synchronized (pe) {
+            byte[] state = pe.serializeState();
+            pe.setDirty(false);
+            return state;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
new file mode 100644
index 0000000..cbe2bda
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StateStorage.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+import java.util.Set;
+
+import com.google.inject.Inject;
+
+/**
+ * <p>
+ * Defines the methods that must be implemented by a backend storage for checkpoints.
+ * </p>
+ *
+ * NOTE: the backend implementation usually needs some kind of initialization. The recommended place to do this is in a
+ * custom method annotated with {@link Inject} annotation, which will be called after the instance is constructed.
+ *
+ */
+public interface StateStorage {
+
+    /**
+     * Stores a checkpoint.
+     *
+     * <p>
+     * NOTE: we don't handle any failure/success return value, because all failure/success notifications go through the
+     * StorageCallback reference
+     * </p>
+     *
+     * @param key
+     *            safeKeeperId
+     * @param state
+     *            checkpoint data as a byte array
+     * @param callback
+     *            callback for receiving notifications of storage operations. This callback is configurable
+     */
+    public void saveState(CheckpointId key, byte[] state, StorageCallback callback);
+
+    /**
+     * Fetches data for a stored checkpoint.
+     * <p>
+     * Must return null if storage does not contain this key.
+     * </p>
+     *
+     * @param key
+     *            safeKeeperId for this checkpoint
+     *
+     * @return stored checkpoint data, or null if the storage does not contain data for the given key
+     */
+    public byte[] fetchState(CheckpointId key);
+
+    /**
+     * Fetches all stored safeKeeper Ids.
+     *
+     * @return all stored safeKeeper Ids.
+     */
+    public Set<CheckpointId> fetchStoredKeys();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
new file mode 100644
index 0000000..6676dd0
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallback.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ *
+ * Callback for reporting the result of an asynchronous storage operation
+ *
+ */
+public interface StorageCallback {
+
+    /**
+     * Notifies the result of a storage operation
+     *
+     * @param resultCode
+     *            code for the result : {@link SafeKeeper.StorageResultCode SafeKeeper.StorageResultCode}
+     * @param message
+     *            whatever message object is suitable
+     */
+    public void storageOperationResult(CheckpointingFramework.StorageResultCode resultCode, Object message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
new file mode 100644
index 0000000..c448399
--- /dev/null
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/ft/StorageCallbackFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.s4.core.ft;
+
+/**
+ * A factory for creating storage callbacks. Storage callback implementations
+ * that can take specific actions upon success or failure of asynchronous
+ * storage operations.
+ *
+ */
+public interface StorageCallbackFactory {
+
+    /**
+     * Factory method
+     *
+     * @return returns a StorageCallback instance
+     */
+    public StorageCallback createStorageCallback();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
index 12f5d2f..70fe4ec 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java
@@ -6,16 +6,17 @@ import java.util.concurrent.TimeUnit;
 
 import junit.framework.Assert;
 
-import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.comm.BareCommModule;
 import org.apache.s4.core.triggers.TriggeredApp;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.StringEvent;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.NIOServerCnxn.Factory;
 import org.junit.After;
 
-import com.google.common.io.Resources;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 
@@ -45,9 +46,7 @@ public abstract class TriggerTest extends ZkBasedTest {
 
     protected CountDownLatch createTriggerAppAndSendEvent() throws IOException, KeeperException, InterruptedException {
         final ZooKeeper zk = CommTestUtils.createZkClient();
-        Injector injector = Guice.createInjector(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+        Injector injector = Guice.createInjector(new BareCommModule(), new BareCoreModule());
         app = injector.getInstance(TriggeredApp.class);
         app.init();
         app.start();
@@ -61,7 +60,7 @@ public abstract class TriggerTest extends ZkBasedTest {
         CountDownLatch signalEvent1Triggered = new CountDownLatch(1);
         CommTestUtils.watchAndSignalCreation("/onTrigger[StringEvent]@" + time1, signalEvent1Triggered, zk);
 
-        CommTestUtils.injectIntoStringSocketAdapter(time1);
+        app.stream.receiveEvent(new EventMessage("-1", "stream", app.getSerDeser().serialize(new StringEvent(time1))));
 
         // check event processed
         Assert.assertTrue(signalEvent1Processed.await(5, TimeUnit.SECONDS));
@@ -69,5 +68,4 @@ public abstract class TriggerTest extends ZkBasedTest {
         // return latch on trigger signal
         return signalEvent1Triggered;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/ab1fca5e/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
new file mode 100644
index 0000000..431e31e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingModuleWithUnrespondingFetchingStorageBackend.java
@@ -0,0 +1,20 @@
+package org.apache.s4.core.ft;
+
+import org.apache.s4.core.ft.FileSystemBasedBackendWithZKStorageCallbackCheckpointingModule.DummyZKStorageCallbackFactory;
+import org.apache.s4.fixtures.CommTestUtils;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class CheckpointingModuleWithUnrespondingFetchingStorageBackend extends AbstractModule {
+
+    @Override
+    protected void configure() {
+        bind(String.class).annotatedWith(Names.named("s4.checkpointing.filesystem.storageRootPath")).toInstance(
+                CommTestUtils.DEFAULT_STORAGE_DIR.getAbsolutePath());
+        bind(StateStorage.class).to(StorageWithUnrespondingFetching.class);
+        bind(CheckpointingFramework.class).to(SafeKeeper.class);
+        bind(StorageCallbackFactory.class).to(DummyZKStorageCallbackFactory.class);
+    }
+
+}