You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/31 09:27:19 UTC

git commit: provide optional recovery after expiry from persister cache

Updated Branches:
  refs/heads/S4-42 [created] b43de357d


provide optional recovery after expiry from persister cache

- also allow injection of persister through persister factory


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/b43de357
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/b43de357
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/b43de357

Branch: refs/heads/S4-42
Commit: b43de357d49711fc319c46865fdb08f67414d4f8
Parents: 45efb82
Author: Matthieu Morel <mm...@apache.org>
Authored: Tue Jan 31 10:03:22 2012 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Tue Jan 31 10:24:53 2012 +0100

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/ft/SafeKeeper.java |   28 ++
 .../org/apache/s4/persist/ConMapPersister.java     |    4 +
 .../org/apache/s4/persist/HashMapPersister.java    |    4 +
 .../java/org/apache/s4/processor/AbstractPE.java   |   31 ++-
 .../org/apache/s4/processor/PrototypeWrapper.java  |    4 -
 .../java/org/apache/s4/ft/CheckpointingTest.java   |    3 +-
 .../test/java/org/apache/s4/ft/RecoveryTest.java   |    4 +-
 s4-core/src/test/java/org/apache/s4/ft/S4App.java  |   25 ++-
 .../src/test/java/org/apache/s4/ft/TestUtils.java  |    4 +-
 .../java/org/apache/s4/ft/pecache/CacheTestPE.java |   60 +++++
 .../TestRecoveryInteractionWithPECache.java        |  101 ++++++++
 .../pecache/app_conf_noRecoveryAfterExpiration.xml |   18 ++
 .../pecache/app_conf_recoveryAfterExpiration.xml   |   18 ++
 .../s4/ft/pecache/s4_core_conf_fs_backend.xml      |  196 +++++++++++++++
 .../java/org/apache/s4/ft/pecache/wall_clock.xml   |    6 +
 .../apache/s4/ft/wordcount/FTWordCountTest.java    |    6 +-
 .../apache/s4/processor/TestPrototypeWrapper.java  |    2 +-
 .../org/apache/s4/wordcount/WordCountTest.java     |    2 +-
 18 files changed, 491 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
index f47fe44..88e0b94 100644
--- a/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
+++ b/s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -22,9 +22,12 @@ import org.apache.s4.dispatcher.partitioner.Hasher;
 import org.apache.s4.emitter.CommLayerEmitter;
 import org.apache.s4.processor.AbstractPE;
 import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.clock.Clock;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
@@ -256,4 +259,29 @@ public class SafeKeeper {
         this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
     }
 
+	/**
+	 * Recovery depends on expiration settings: 
+	 * a PE may or may not recover its previous state if it was expired, depending on the "recoveryAfterExpiration" setting.
+	 * @param abstractPE TODO
+	 * @param pe TODO
+	 * 
+	 */
+	public boolean mustRestoreState(AbstractPE pe, int ttl, Clock clock) {
+		if (pe.isRecoveryAfterExpiration()) {
+			return true;
+		} else {
+			// NOTE : ttl is not checkpointed. Get the value from current prototype
+			if (pe.getCacheAddDate()!=-1 && ttl!=-1 && 
+					(pe.getCacheAddDate() + (1000 * ttl)) <= clock.getCurrentTime()
+					) {
+				if (logger.isDebugEnabled()) {
+					logger.debug("Not recovering PE ["+ pe.getSafeKeeperId() + "] because it was expired");
+				}
+				return false;
+			} else {
+				return true;
+			}
+		}
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
index 8b3f529..7c09c12 100644
--- a/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
+++ b/s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
@@ -17,6 +17,7 @@
  */
 package org.apache.s4.persist;
 
+import org.apache.s4.processor.AbstractPE;
 import org.apache.s4.util.clock.Clock;
 
 import java.util.Enumeration;
@@ -121,6 +122,9 @@ public class ConMapPersister implements Persister {
         ce.value = value;
         ce.period = period;
         ce.addTime = s4Clock.getCurrentTime();
+        if (value instanceof AbstractPE) {
+            ((AbstractPE)value).setCacheAddDate(ce.addTime); 
+        }
         cache.put(key, ce);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
index 53e8380..f766d9e 100644
--- a/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
+++ b/s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
@@ -17,6 +17,7 @@
  */
 package org.apache.s4.persist;
 
+import org.apache.s4.processor.AbstractPE;
 import org.apache.s4.util.clock.Clock;
 
 import java.util.ArrayList;
@@ -123,6 +124,9 @@ public class HashMapPersister implements Persister {
         ce.value = value;
         ce.period = period;
         ce.addTime = s4Clock.getCurrentTime();
+    	if (value instanceof AbstractPE) {
+    		((AbstractPE)value).setCacheAddDate(ce.addTime);
+    	}
         cache.put(key, ce);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
index 58757d3..65a4fdd 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -83,7 +83,7 @@ public abstract class AbstractPE implements Cloneable {
         }
     }
 
-    transient private Clock clock;
+    private transient Clock clock;
     // FIXME replaces monitor wait on AbstractPE, for triggering possible extra
     // thread when checkpointing activated
     transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
@@ -120,6 +120,8 @@ public abstract class AbstractPE implements Cloneable {
     transient private int checkpointableEventCount = 0;
     transient private int checkpointsBeforePause = -1;
     transient private long checkpointingPauseTimeInMillis;
+	private boolean isRecoveryAfterExpiration;
+    private long cacheAddDate = -1;
 
     transient private OverloadDispatcher overloadDispatcher;
 
@@ -209,7 +211,7 @@ public abstract class AbstractPE implements Cloneable {
             // initialize checkpointing event flag
             this.isCheckpointingEvent = false;
             if (!recoveryAttempted) {
-                recover();
+           	    recover();
                 recoveryAttempted = true;
             }
         }
@@ -562,7 +564,9 @@ public abstract class AbstractPE implements Cloneable {
         }
         try {
             AbstractPE peInOldState = deserializeState(serializedState);
-            restoreState(peInOldState);
+            if (safeKeeper.mustRestoreState(peInOldState, ttl, clock)) {
+            	restoreState(peInOldState);
+            }
         } catch (RuntimeException e) {
             Logger.getLogger("s4-ft").error("Cannot restore state for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
         }
@@ -578,6 +582,10 @@ public abstract class AbstractPE implements Cloneable {
             this.safeKeeperSetSignal.countDown();
         }
     }
+    
+    public SafeKeeper getSafeKeeper() {
+    	return this.safeKeeper;
+    }
 
     public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
         isCheckpointingEvent = true;
@@ -777,4 +785,21 @@ public abstract class AbstractPE implements Cloneable {
         }
 
     }
+
+	public void setCacheAddDate(long cacheAddDate) {
+		this.cacheAddDate = cacheAddDate;
+	}
+
+	public long getCacheAddDate() {
+		return cacheAddDate;
+	}
+	
+	public boolean isRecoveryAfterExpiration() {
+		return isRecoveryAfterExpiration;
+	}
+
+	public void setRecoveryAfterExpiration(boolean isRecoveryAfterExpiration) {
+		this.isRecoveryAfterExpiration = isRecoveryAfterExpiration;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java b/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
index 21260ed..c233c5f 100644
--- a/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
+++ b/s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
@@ -31,7 +31,6 @@ public class PrototypeWrapper {
     private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
     private AbstractPE prototype;
     Persister lookupTable;
-	SafeKeeper safeKeeper;
 
     public String getId() {
         return prototype.getId();
@@ -125,7 +124,4 @@ public class PrototypeWrapper {
         return prototype.advise();
     }
 
-	public void setSafeKeeper(SafeKeeper safeKeeper) {
-		this.safeKeeper = safeKeeper;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
index 37f2a9a..3fd70a7 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
@@ -31,7 +31,7 @@ public class CheckpointingTest extends S4TestCase {
     public void prepare() throws Exception {
         zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
         app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
-        app.initializeS4App();
+        app.initializeS4App("app_conf.xml");
     }
 
     @After
@@ -101,6 +101,7 @@ public class CheckpointingTest extends S4TestCase {
             keyValueStringField.setAccessible(true);    
             keyValueStringField.set(refPE, "value");
             refPE.setId("statefulPE");
+            refPE.setCacheAddDate(pe.getCacheAddDate());
             refPE.setKeys(new String[] {});
             KryoSerDeser kryoSerDeser = new KryoSerDeser();
             byte[] refBytes = kryoSerDeser.serialize(refPE);

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
index 1ca5652..779f1b0 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -51,7 +51,7 @@ public class RecoveryTest extends S4TestCase {
         final ZooKeeper zk = TestUtils.createZkClient();
         // 1. instantiate remote S4 app
         forkedS4App = TestUtils.forkS4App(getClass().getName(),
-                "s4_core_conf_fs_backend.xml");
+                "s4_core_conf_fs_backend.xml", "app_conf.xml");
         // TODO synchro
         Thread.sleep(4000);
 
@@ -93,7 +93,7 @@ public class RecoveryTest extends S4TestCase {
         StatefulTestPE.DATA_FILE.delete();
 
         forkedS4App = TestUtils.forkS4App(getClass().getName(),
-                "s4_core_conf_fs_backend.xml");
+                "s4_core_conf_fs_backend.xml", "app_conf.xml");
         // TODO synchro
         Thread.sleep(2000);
         // trigger recovery by sending application event to set value 2

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/S4App.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/S4App.java b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
index bd4a6df..42d46e4 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/S4App.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
@@ -59,19 +59,23 @@ public class S4App {
      * @param args
      * @throws Exception
      */
-    public static void main(String[] args) throws Exception {
-        Class testClass = Class.forName(args[0]);
-        String s4CoreConfFile = args[1];
-        S4App app = new S4App(testClass, s4CoreConfFile);
-        S4TestCase.initS4Parameters();
-        app.initializeS4App();
-
+    public static void main(String[] args) {
+    	try {
+    		Class testClass = Class.forName(args[0]);
+    		String s4CoreConfFile = args[1];
+    		String s4AppConfFile = args[2];
+    		S4App app = new S4App(testClass, s4CoreConfFile);
+    		S4TestCase.initS4Parameters();
+    		app.initializeS4App(s4AppConfFile);
+    	} catch (Exception e) {
+    		Logger.getLogger(S4App.class).error("Cannot start S4 app", e);
+    	}
     }
 
     /**
      * Performs dependency injection and starts the S4 plaftform.
      */
-    public void initializeS4App()
+    public void initializeS4App(String s4AppConfFile)
             throws Exception {
         initConfigPaths(testClass, s4CoreConfFileName);
         ApplicationContext coreContext = null;
@@ -93,6 +97,7 @@ public class S4App {
         Watcher w = (Watcher) context.getBean("watcher");
         w.setConfigFilename(configBase + s4CoreConfFileName);
 
+        Logger.getLogger(getClass()).info("initializing app");
         // load extension modules
         // String[] configFileNames = getModuleConfigFiles(extsHome, prop);
         // if (configFileNames.length > 0) {
@@ -105,7 +110,8 @@ public class S4App {
         // }
 
         // load application modules
-        String applicationConfigFileName = configBase + "app_conf.xml";
+//        String applicationConfigFileName = configBase + "app_conf.xml";
+        String applicationConfigFileName = configBase + s4AppConfFile;
         String[] configFileUrls = new String[] { "file:"
                 + applicationConfigFileName };
         context = new FileSystemXmlApplicationContext(configFileUrls, context);
@@ -128,6 +134,7 @@ public class S4App {
                     }
                 }
                 ((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
+                
             } catch (NoSuchMethodException mnfe) {
                 // acceptable
             }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
index 84af34f..225c74d 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -35,7 +35,8 @@ import org.apache.zookeeper.server.ZooKeeperServer;
 public class TestUtils {
 
     public static final int ZK_PORT = 21810;
-    public static Process forkS4App(String testClassName, String s4CoreConfFileName) throws IOException,
+    
+    public static Process forkS4App(String testClassName, String s4CoreConfFileName, String s4AppConfFileName) throws IOException,
             InterruptedException {
         List<String> cmdList = new ArrayList<String>();
         cmdList.add("java");
@@ -53,6 +54,7 @@ public class TestUtils {
         cmdList.add(S4App.class.getName());
         cmdList.add(testClassName);
         cmdList.add(s4CoreConfFileName);
+        cmdList.add(s4AppConfFileName);
 
         ProcessBuilder pb = new ProcessBuilder(cmdList);
         pb.directory(new File(System.getProperty("user.dir")));

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java b/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
new file mode 100644
index 0000000..5f4d35f
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
@@ -0,0 +1,60 @@
+package org.apache.s4.ft.pecache;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class CacheTestPE extends AbstractPE implements Watcher {
+
+	String value = "";
+	transient ZooKeeper zk = null;
+
+	public void processEvent(KeyValue event) {
+		if (zk == null) {
+			Logger.getLogger(getClass()).info("Creating ZK connection");
+            try {
+                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+		if ("key".equals(event.getKey())) {
+			setValue(this.value + event.getValue());
+			try {
+				Logger.getLogger(getClass()).info("setting ZK /value");
+				zk.create("/value", value.getBytes(), Ids.OPEN_ACL_UNSAFE,
+						CreateMode.PERSISTENT);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		} else {
+			throw new RuntimeException("unknown event " + event);
+
+		}
+	}
+
+	public void setValue(String value) {
+		this.value = value;
+	}
+	
+	@Override
+	public void output() {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void process(WatchedEvent arg0) {
+		// TODO Auto-generated method stub
+		
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java b/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
new file mode 100644
index 0000000..bc868ea
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
@@ -0,0 +1,101 @@
+package org.apache.s4.ft.pecache;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4App;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.json.JSONException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRecoveryInteractionWithPECache extends S4TestCase {
+	
+	private static Factory zookeeperServerConnectionFactory;
+	private Process forkedS4App;
+	
+	@Before
+    public void prepare() throws IOException, InterruptedException, KeeperException {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+
+    }
+	
+	@After
+    public void cleanup() throws Exception {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        forkedS4App.destroy();
+    }
+	
+	@Test
+	public void testNoRecoveryAfterExpiration() throws Exception {
+		byte[] data = testAndReturnValueAfterExpiration("app_conf_noRecoveryAfterExpiration.xml");
+		Assert.assertTrue("got: " + new String(data) , new String(data).equals("value2-"));
+	}
+
+	@Test
+	public void testRecoveryAfterExpiration() throws Exception {
+		byte[] data = testAndReturnValueAfterExpiration("app_conf_recoveryAfterExpiration.xml");
+		Assert.assertTrue(new String(data), new String(data).equals("value1-value2-"));
+	}
+	
+	
+	private byte[] testAndReturnValueAfterExpiration(String appConfig) throws Exception,
+			IOException, KeeperException, InterruptedException, JSONException {
+		
+		forkedS4App = TestUtils.forkS4App(getClass().getName(), "s4_core_conf_fs_backend.xml", appConfig);
+//		app.initializeS4App();
+		final ZooKeeper zk = TestUtils.createZkClient();
+		
+		CountDownLatch latch1=new CountDownLatch(1);
+		TestUtils.watchAndSignalCreation("/value", latch1,
+                zk);
+		
+		EventGenerator generator = new EventGenerator();
+		generator.injectValueEvent(new KeyValue("key", "value1-"),
+                    "Values", 0);
+		
+		latch1.await(10, TimeUnit.SECONDS);
+		
+		byte[] data = zk.getData("/value", false, null);
+		Assert.assertTrue(new String(data).equals("value1-"));
+		
+		zk.delete("/value", -1);
+		
+		boolean nodeDeleted = false;
+		try {
+			zk.getData("/value", false, null);
+		} catch (NoNodeException e) {
+			nodeDeleted = true;
+		}
+		Assert.assertTrue(nodeDeleted);
+		// NOTE: we need sleeps to wait for checkpoints (in the absence of ZK notifications for that)
+		Thread.sleep(1000);
+		forkedS4App.destroy();
+		forkedS4App = TestUtils.forkS4App(getClass().getName(), "s4_core_conf_fs_backend.xml", appConfig);
+		CountDownLatch latch2=new CountDownLatch(1);
+		TestUtils.watchAndSignalCreation("/value", latch2,
+                zk);
+		generator.injectValueEvent(new KeyValue("key", "value2-"),
+                "Values", 0);
+		latch2.await(10, TimeUnit.SECONDS);
+		data = zk.getData("/value", false, null);
+		return data;
+	}
+	
+	
+	
+	
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
new file mode 100644
index 0000000..52c3b0b
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+	<bean id="cacheTestPE" class="org.apache.s4.ft.pecache.CacheTestPE">
+    <property name="id" value="cacheTestPE"/>
+    <property name="keys">
+      <list>
+        <value>Values key</value>
+      </list>
+    </property>
+    <property name="checkpointingFrequencyByEventCount" value="1" /> 
+    <property name="ttl" value="1"/> 
+    <property name="recoveryAfterExpiration" value="false"/>
+  </bean>
+	
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
new file mode 100644
index 0000000..dd455d4
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+	<bean id="cacheTestPE" class="org.apache.s4.ft.pecache.CacheTestPE">
+    <property name="id" value="cacheTestPE"/>
+    <property name="keys">
+      <list>
+        <value>Values key</value>
+      </list>
+    </property>
+    <property name="checkpointingFrequencyByEventCount" value="1" /> 
+    <property name="ttl" value="1"/> 
+    <property name="recoveryAfterExpiration" value="true"/>
+  </bean>
+	
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
new file mode 100755
index 0000000..7f0e26d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="org.apache.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="org.apache.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="org.apache.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="org.apache.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response 
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter). 
+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="org.apache.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="org.apache.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="org.apache.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="org.apache.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="org.apache.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+	
+	<bean id="rawListener" class="org.apache.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="org.apache.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="org.apache.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="org.apache.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="org.apache.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="org.apache.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+	<bean id="loopbackDispatcher" class="org.apache.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="org.apache.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="org.apache.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="fsStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+        <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+    </bean>
+    
+    <bean id="loggingStorageCallbackFactory" class="org.apache.s4.ft.LoggingStorageCallbackFactory"/>
+
+    <bean id="fsStateStorage" class="org.apache.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+        <!-- if not specified, default is <current_dir>/tmp/storage 
+        <property name="storageRootPath" value="${storage_root_path}" /> -->
+    </bean>
+    
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
new file mode 100644
index 0000000..cc571a6
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ 
+  <bean id="clock" class="org.apache.s4.util.clock.WallClock"/>
+ 
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
index 37a68d1..3259277 100644
--- a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -83,7 +83,7 @@ public class FTWordCountTest extends S4TestCase {
             throws Exception {
         final ZooKeeper zk = TestUtils.createZkClient();
 
-        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
         TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
@@ -110,7 +110,7 @@ public class FTWordCountTest extends S4TestCase {
         forkedS4App.destroy();
 
         // recovering and making sure checkpointing still works
-        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
 
         // add authorizations for continuing processing. Without these, the
         // WordClassifier processed keeps waiting
@@ -136,7 +136,7 @@ public class FTWordCountTest extends S4TestCase {
 
         // crash the app
         forkedS4App.destroy();
-        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
 
         // add authorizations for continuing processing. Without these, the
         // WordClassifier processed keeps waiting

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
index 445972a..155b032 100644
--- a/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
+++ b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
@@ -2,8 +2,8 @@ package org.apache.s4.processor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import org.apache.s4.util.clock.WallClock;
 
+import org.apache.s4.util.clock.WallClock;
 import org.junit.Test;
 
 public class TestPrototypeWrapper

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/b43de357/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 5a511d7..99ef2b8 100644
--- a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -46,7 +46,7 @@ public class WordCountTest extends S4TestCase {
     public void testSimple() throws Exception {
         
         S4App app = new S4App(getClass(), "s4_core_conf.xml");
-        app.initializeS4App();
+        app.initializeS4App("app_conf.xml");
         final ZooKeeper zk = TestUtils.createZkClient();
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);