You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC

[26/50] [abbrv] Rename packages in preparation for move to Apache

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml b/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml
deleted file mode 100755
index 20c8bb1..0000000
--- a/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml
+++ /dev/null
@@ -1,194 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
-	<bean id="propertyConfigurer"
-		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
-		<property name="location">
-			<value>classpath:s4_core.properties</value>
-		</property>
-		<property name="properties">
-			<props>
-				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
-				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
-			</props>
-		</property>
-		<property name="ignoreUnresolvablePlaceholders" value="true" />
-	</bean>
-
-	<bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
-
-	<bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
-		init-method="init">
-		<property name="serDeser" ref="serDeser" />
-		<property name="listener" ref="rawListener" />
-		<property name="listenerAppName" value="${adapter_app_name}" />
-		<property name="monitor" ref="monitor" />
-	</bean>
-
-	<bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
-		init-method="init">
-		<property name="serDeser" ref="serDeser" />
-		<property name="listener" ref="rawListener" />
-		<property name="monitor" ref="monitor" />
-	</bean>
-
-	<bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
-		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
-		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
-	</bean>
-
-	<!--START: Dispatchers for control event processor. If stream name in Response 
-		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter). 
-		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
-	<bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
-		<property name="dispatchers">
-			<list>
-				<ref bean="ctrlDispatcherFilteredS4" />
-				<ref bean="ctrlDispatcherFilteredAdapter" />
-			</list>
-		</property>
-	</bean>
-
-	<bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
-		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
-		<property name="streams">
-			<list>
-				<value>@${adapter_app_name}</value>
-			</list>
-		</property>
-	</bean>
-
-	<bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
-		<property name="dispatcher" ref="ctrlDispatcherS4" />
-		<property name="streams">
-			<list>
-				<value>@${adapter_app_name}</value>
-			</list>
-		</property>
-	</bean>
-
-	<bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
-		<property name="hasher" ref="hasher" />
-		<property name="debug" value="false" />
-	</bean>
-
-	<bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
-		init-method="init">
-		<property name="partitioners">
-			<list>
-				<ref bean="genericPartitioner" />
-			</list>
-		</property>
-		<property name="eventEmitter" ref="commLayerEmitter" />
-		<property name="loggerName" value="s4" />
-	</bean>
-
-	<bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
-		init-method="init">
-		<property name="partitioners">
-			<list>
-				<ref bean="genericPartitioner" />
-			</list>
-		</property>
-		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
-		<property name="loggerName" value="s4" />
-	</bean>
-	<!-- END: Dispatchers for control events -->
-
-	<!-- Control Events handler -->
-	<bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
-		<property name="dispatcher" ref="ctrlDispatcher" />
-	</bean>
-
-	<bean id="peContainer" class="io.s4.processor.PEContainer"
-		init-method="init" lazy-init="true">
-		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
-		<property name="monitor" ref="monitor" />
-		<property name="trackByKey" value="true" />
-		<property name="clock" ref="clock" />
-		<property name="controlEventProcessor" ref="ctrlHandler" />
-		<property name="safeKeeper" ref="safeKeeper" />
-	</bean>
-
-	<bean id="rawListener" class="io.s4.listener.CommLayerListener"
-		init-method="init">
-		<property name="serDeser" ref="serDeser" />
-		<property name="clusterManagerAddress" value="${zk_address}" />
-		<property name="appName" value="${s4_app_name}" />
-		<property name="maxQueueSize" value="${listener_max_queue_size}" />
-		<property name="monitor" ref="monitor" />
-	</bean>
-
-	<bean id="eventListener" class="io.s4.collector.EventListener"
-		init-method="init">
-		<property name="rawListener" ref="rawListener" />
-		<property name="peContainer" ref="peContainer" />
-		<property name="monitor" ref="monitor" />
-	</bean>
-
-	<bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
-		init-method="init">
-		<property name="flushInterval" value="30" />
-		<property name="loggerName" value="monitor" />
-	</bean>
-
-	<bean id="watcher" class="io.s4.util.Watcher" init-method="init"
-		lazy-init="true">
-		<property name="monitor" ref="monitor" />
-		<property name="peContainer" ref="peContainer" />
-		<property name="minimumMemory" value="52428800" />
-	</bean>
-
-
-
-
-	<!-- Some useful beans related to client-adapter for apps -->
-
-	<!-- Dispatcher to send to all adapter nodes. -->
-	<bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
-		init-method="init">
-		<property name="partitioners">
-			<list>
-				<ref bean="broadcastPartitioner" />
-			</list>
-		</property>
-		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
-		<property name="loggerName" value="s4" />
-	</bean>
-
-	<!-- Partitioner to achieve broadcast -->
-	<bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
-
-
-
-	<bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
-        init-method="init">
-        <property name="partitioners">
-            <list>
-                <ref bean="loopbackPartitioner" />
-            </list>
-        </property>
-        <property name="eventEmitter" ref="commLayerEmitter" />
-        <property name="loggerName" value="s4" />
-    </bean>
-
-    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
-        <property name="eventEmitter" ref="commLayerEmitter"/>
-    </bean>
-
-    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
-        <property name="stateStorage" ref="fsStateStorage" />
-        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
-        <property name="serializer" ref="serDeser"/>
-        <property name="hasher" ref="hasher"/>
-    </bean>
-
-    <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="checkStorageDir">
-        <!-- if not specified, default is <current_dir>/tmp/storage 
-        <property name="storageRootPath" value="${storage_root_path}" /> -->
-    </bean>
-
-
-
-</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml b/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml
deleted file mode 100644
index e149ecc..0000000
--- a/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
- 
-  <bean id="clock" class="io.s4.util.clock.WallClock"/>
- 
-</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
new file mode 100644
index 0000000..c0dfa30
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
@@ -0,0 +1,108 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.serialize.KryoSerDeser;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.reflectasm.FieldAccess;
+
+public class CheckpointingTest extends S4TestCase {
+
+    private static Factory zookeeperServerConnectionFactory = null;
+    private S4App app;
+
+    @Before
+    public void prepare() throws Exception {
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+        app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
+        app.initializeS4App();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        if (app!=null) {
+            app.destroy();
+        }
+    }
+
+    @Test
+    public void testCheckpointStorage() throws Exception {
+            final ZooKeeper zk = TestUtils.createZkClient();
+
+            
+
+            // 2. generate a simple event that creates and changes the state of
+            // the
+            // PE
+
+            // NOTE: coordinate through zookeeper
+            final CountDownLatch signalValue1Set = new CountDownLatch(1);
+
+            TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+            final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+            TestUtils.watchAndSignalCreation("/checkpointed",
+                    signalCheckpointed, zk);
+            EventGenerator gen = new EventGenerator();
+            gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1",
+                    0);
+
+            signalValue1Set.await();
+            StatefulTestPE pe = (StatefulTestPE) S4TestCase.registeredPEs
+                    .get(new SafeKeeperId("statefulPE", "value"));
+            Assert.assertEquals("message1", pe.getValue1());
+            Assert.assertEquals("", pe.getValue2());
+
+            // 3. generate a checkpoint event 
+            gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
+                    "Stream1", 0);
+            signalCheckpointed.await();
+
+            // NOTE: the backend has asynchronous save operations
+            Thread.sleep(1000);
+
+            SafeKeeperId safeKeeperId = pe.getSafeKeeperId();
+            File expected = new File(System.getProperty("user.dir")
+                    + File.separator
+                    + "tmp"
+                    + File.separator
+                    + "storage"
+                    + File.separator
+                    + safeKeeperId.getPrototypeId()
+                    + File.separator
+                    + Base64.encodeBase64URLSafeString(safeKeeperId
+                            .getStringRepresentation().getBytes()));
+
+            // 4. verify that state was correctly persisted
+            Assert.assertTrue(expected.exists());
+
+            StatefulTestPE refPE = new StatefulTestPE();
+            refPE.setValue1("message1");
+            refPE.setId("statefulPE");
+            refPE.setKeys(new String[] {});
+            KryoSerDeser kryoSerDeser = new KryoSerDeser();
+            byte[] refBytes = kryoSerDeser.serialize(refPE);
+
+            Assert.assertTrue(Arrays.equals(refBytes,
+                    TestUtils.readFileAsByteArray(expected)));
+
+            app.destroy();
+            
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java b/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java
new file mode 100644
index 0000000..bb4aa9c
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java
@@ -0,0 +1,63 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.serialize.KryoSerDeser;
+import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.LoadGenerator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class EventGenerator {
+
+    private CommLayerEmitter eventEmitter;
+
+    public EventGenerator() {
+        SerializerDeserializer serDeser = new KryoSerDeser();
+
+        eventEmitter = new CommLayerEmitter();
+        eventEmitter.setAppName("s4");
+        eventEmitter.setListenerAppName("s4");
+        eventEmitter.setClusterManagerAddress("localhost");
+        eventEmitter
+                .setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
+        eventEmitter.setSerDeser(serDeser);
+        eventEmitter.init();
+
+        LoadGenerator generator = new LoadGenerator();
+        generator.setEventEmitter(eventEmitter);
+    }
+
+    public void injectValueEvent(KeyValue keyValue, String streamName,
+            int partitionId) throws JSONException {
+
+        Schema schema = new Schema(KeyValue.class);
+        JSONObject jsonRecord = new JSONObject("{key:" + keyValue.getKey()
+                + ",value:" + keyValue.getValue() + "}");
+        Object event = LoadGenerator.makeRecord(jsonRecord, schema);
+        CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
+        compoundKeyInfo.setCompoundKey("key");
+        compoundKeyInfo.setCompoundValue("value");
+        List<CompoundKeyInfo> compoundKeyInfos = new ArrayList<CompoundKeyInfo>();
+        compoundKeyInfos.add(compoundKeyInfo);
+        EventWrapper eventWrapper = new EventWrapper(streamName, event,
+                compoundKeyInfos);
+        eventEmitter.emit(partitionId, eventWrapper);
+    }
+
+    public void injectEvent(Object event, String streamName, int partitionId,
+            List<CompoundKeyInfo> compoundKeyInfos) throws JSONException {
+
+        EventWrapper eventWrapper = new EventWrapper(streamName, event,
+                compoundKeyInfos);
+        eventEmitter.emit(partitionId, eventWrapper);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java b/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java
new file mode 100644
index 0000000..e94ae2f
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java
@@ -0,0 +1,33 @@
+package org.apache.s4.ft;
+
+public class KeyValue {
+
+	String key;
+	String value;
+
+	public KeyValue() {
+	}
+
+	public KeyValue(String key, String value) {
+		super();
+		this.key = key;
+		this.value = value;
+	}
+
+	public String getKey() {
+		return key;
+	}
+
+	public String getValue() {
+		return value;
+	}
+
+	public void setKey(String key) {
+		this.key = key;
+	}
+
+	public void setValue(String value) {
+		this.value = value;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
new file mode 100644
index 0000000..1ca5652
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -0,0 +1,113 @@
+package org.apache.s4.ft;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecoveryTest extends S4TestCase {
+
+    public static long ZOOKEEPER_PORT = 21810;
+    private Process forkedS4App = null;
+    private static Factory zookeeperServerConnectionFactory = null;
+
+    @Before
+    public void prepare() throws Exception {
+        TestUtils.cleanupTmpDirs();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+        final ZooKeeper zk = TestUtils.createZkClient();
+        try {
+            zk.delete("/value1Set", -1);
+        } catch (Exception ignored) {
+        }
+        try {
+            // FIXME can't figure out where this is retained
+            zk.delete("/value2Set", -1);
+        } catch (Exception ignored) {
+        }
+        try {
+            // FIXME can't figure out where this is retained
+            zk.delete("/checkpointed", -1);
+        } catch (Exception ignored) {
+        }
+        zk.close();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        TestUtils.killS4App(forkedS4App);
+    }
+
+    @Test
+    public void testCheckpointRestorationThroughApplicationEvent()
+            throws Exception {
+        final ZooKeeper zk = TestUtils.createZkClient();
+        // 1. instantiate remote S4 app
+        forkedS4App = TestUtils.forkS4App(getClass().getName(),
+                "s4_core_conf_fs_backend.xml");
+        // TODO synchro
+        Thread.sleep(4000);
+
+        CountDownLatch signalValue1Set = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+
+        // 2. generate a simple event that changes the state of the PE
+        // --> this event triggers recovery
+        // we inject a value for value2 field (was for value1 in
+        // checkpointing
+        // test). This should trigger recovery and provide a pe with value1
+        // and
+        // value2 set:
+        // value1 from recovery, and value2 from injected event.
+        EventGenerator gen = new EventGenerator();
+        gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1", 0);
+        signalValue1Set.await();
+        final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
+                zk);
+        // trigger checkpoint
+        gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
+                "Stream1", 0);
+        signalCheckpointed.await();
+        // signalCheckpointAddedByBK.await();
+
+        signalValue1Set = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+        gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
+        signalValue1Set.await();
+        Assert.assertEquals("value1=message1b ; value2=",
+                TestUtils.readFile(StatefulTestPE.DATA_FILE));
+
+        Thread.sleep(2000);
+        // kill app
+        forkedS4App.destroy();
+        // S4App.killS4App(getClass().getName());
+
+        StatefulTestPE.DATA_FILE.delete();
+
+        forkedS4App = TestUtils.forkS4App(getClass().getName(),
+                "s4_core_conf_fs_backend.xml");
+        // TODO synchro
+        Thread.sleep(2000);
+        // trigger recovery by sending application event to set value 2
+        CountDownLatch signalValue2Set = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+
+        gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
+        signalValue2Set.await(10, TimeUnit.SECONDS);
+
+        // we should get "message1" (checkpointed) instead of "message1b"
+        // (latest)
+        Assert.assertEquals("value1=message1 ; value2=message2",
+                TestUtils.readFile(StatefulTestPE.DATA_FILE));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/S4App.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/S4App.java b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
new file mode 100644
index 0000000..bd4a6df
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
@@ -0,0 +1,201 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.processor.PEContainer;
+import org.apache.s4.util.Watcher;
+import org.apache.s4.util.clock.Clock;
+import org.apache.s4.util.clock.EventClock;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ * 
+ *
+ */
+public class S4App {
+
+    String configType = "typical";
+    long seedTime = 0;
+    ApplicationContext appContext = null;
+    ApplicationContext adapterContext = null;
+    private String configBase;
+    boolean configPathsInitialized = false;
+    private String[] coreConfigFileUrls;
+    private Class testClass;
+    private String s4CoreConfFileName;
+    public static File DEFAULT_TEST_OUTPUT_DIR = new File(
+            System.getProperty("user.dir") + File.separator + "tmp");
+    public static File DEFAULT_STORAGE_DIR = new File(
+            DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+                    + "storage");
+
+    public static String lockDirPath = System.getProperty("user.dir")
+            + File.separator + "tmp" + File.separator + "lock";
+
+    private S4App() {}
+    
+    public S4App(Class testClass, String s4CoreConfFileName) throws Exception {
+        this.testClass = testClass;
+        this.s4CoreConfFileName = s4CoreConfFileName;
+        initConfigPaths(testClass, s4CoreConfFileName);
+    }
+    /**
+     * @param args
+     * @throws Exception
+     */
+    public static void main(String[] args) throws Exception {
+        Class testClass = Class.forName(args[0]);
+        String s4CoreConfFile = args[1];
+        S4App app = new S4App(testClass, s4CoreConfFile);
+        S4TestCase.initS4Parameters();
+        app.initializeS4App();
+
+    }
+
+    /**
+     * Performs dependency injection and starts the S4 plaftform.
+     */
+    public void initializeS4App()
+            throws Exception {
+        initConfigPaths(testClass, s4CoreConfFileName);
+        ApplicationContext coreContext = null;
+
+        coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls,
+                coreContext);
+        ApplicationContext context = coreContext;
+
+        Clock clock = (Clock) context.getBean("clock");
+        if (clock instanceof EventClock && seedTime > 0) {
+            EventClock s4EventClock = (EventClock) clock;
+            s4EventClock.updateTime(seedTime);
+            System.out.println("Intializing event clock time with seed time "
+                    + s4EventClock.getCurrentTime());
+        }
+
+        PEContainer peContainer = (PEContainer) context.getBean("peContainer");
+
+        Watcher w = (Watcher) context.getBean("watcher");
+        w.setConfigFilename(configBase + s4CoreConfFileName);
+
+        // load extension modules
+        // String[] configFileNames = getModuleConfigFiles(extsHome, prop);
+        // if (configFileNames.length > 0) {
+        // String[] configFileUrls = new String[configFileNames.length];
+        // for (int i = 0; i < configFileNames.length; i++) {
+        // configFileUrls[i] = "file:" + configFileNames[i];
+        // }
+        // context = new FileSystemXmlApplicationContext(configFileUrls,
+        // context);
+        // }
+
+        // load application modules
+        String applicationConfigFileName = configBase + "app_conf.xml";
+        String[] configFileUrls = new String[] { "file:"
+                + applicationConfigFileName };
+        context = new FileSystemXmlApplicationContext(configFileUrls, context);
+        // attach any beans that implement ProcessingElement to the PE
+        // Container
+        String[] processingElementBeanNames = context
+                .getBeanNamesForType(AbstractPE.class);
+        for (String processingElementBeanName : processingElementBeanNames) {
+            Object bean = context.getBean(processingElementBeanName);
+            try {
+                Method getS4ClockMethod = bean.getClass().getMethod(
+                        "getClock");
+
+                if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
+                    if (getS4ClockMethod.invoke(bean) == null) {
+                        Method setS4ClockMethod = bean.getClass().getMethod(
+                                "setClock", Clock.class);
+                        setS4ClockMethod.invoke(bean,
+                                coreContext.getBean("clock"));
+                    }
+                }
+                ((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
+            } catch (NoSuchMethodException mnfe) {
+                // acceptable
+            }
+            System.out.println("Adding processing element with bean name "
+                    + processingElementBeanName + ", id "
+                    + ((AbstractPE) bean).getId());
+            peContainer.addProcessor((AbstractPE) bean);
+        }
+
+        appContext = context;
+    }
+    
+    
+
+    private void initConfigPaths(Class testClass, String s4CoreConfFileName)
+            throws IOException {
+        if (!configPathsInitialized) {
+            S4TestCase.initS4Parameters();
+            String testDir = testClass.getPackage().getName()
+                    .replace('.', File.separatorChar);
+
+            ClassPathResource propResource = new ClassPathResource(
+                    "s4_core.properties");
+            Properties prop = new Properties();
+            if (propResource.exists()) {
+                prop.load(propResource.getInputStream());
+            } else {
+                System.err
+                        .println("Unable to find s4_core.properties. It must be available in classpath");
+                Thread.dumpStack();
+                System.exit(12);
+            }
+
+            configBase = System.getProperty("user.dir") + File.separator
+                    + "src" + File.separator + "test" + File.separator + "java"
+                    + File.separator + testDir + File.separator;
+            String configPath = configBase + File.separatorChar
+                    + "wall_clock.xml";
+            List<String> coreConfigUrls = new ArrayList<String>();
+            coreConfigUrls.add(configPath);
+
+            // load core config xml
+            if (s4CoreConfFileName != null) {
+                // may be null for adapter
+                configPath = configBase + s4CoreConfFileName;
+                File configFile = new File(configPath);
+                if (!configFile.exists()) {
+                    System.err.printf(
+                            "S4 core config file %s does not exist\n",
+                            configPath);
+                    Thread.dumpStack();
+                    System.exit(13);
+                }
+                coreConfigUrls.add(configPath);
+            }
+            String[] coreConfigFiles = new String[coreConfigUrls.size()];
+            coreConfigUrls.toArray(coreConfigFiles);
+
+            coreConfigFileUrls = new String[coreConfigFiles.length];
+            for (int i = 0; i < coreConfigFiles.length; i++) {
+                coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
+            }
+            configPathsInitialized = true;
+
+        }
+    }
+    
+    public void destroy() {
+        ((FileSystemXmlApplicationContext)appContext).close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java b/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java
new file mode 100644
index 0000000..6095e4e
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java
@@ -0,0 +1,52 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.processor.AbstractPE;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.springframework.context.ApplicationContext;
+
+public class S4TestCase {
+
+    String configType = "typical";
+    long seedTime = 0;
+    ApplicationContext appContext = null;
+    ApplicationContext adapterContext = null;
+    boolean configPathsInitialized = false;
+    public static File DEFAULT_TEST_OUTPUT_DIR = new File(
+            System.getProperty("user.dir") + File.separator + "tmp");
+    public static File DEFAULT_STORAGE_DIR = new File(
+            DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+                    + "storage");
+    // use a static map to track PE instances
+    public static final Map<Object, AbstractPE> registeredPEs = new Hashtable<Object, AbstractPE>();
+
+    
+    @BeforeClass
+    public static void cleanLocks() {
+        TestUtils.cleanupTmpDirs();
+    }
+    
+
+    @BeforeClass
+    public static void initS4Parameters() throws IOException {
+    
+        System.setProperty("commlayer_mode", "static");
+        System.setProperty("commlayer.mode", "static");
+        System.setProperty("DequeueCount", "6");
+        System.setProperty("lock_dir", S4App.lockDirPath);
+        File lockDir = new File(S4App.lockDirPath);
+        if (!lockDir.exists()) {
+            if (!lockDir.mkdirs()) {
+                throw new RuntimeException("Cannot create directory: ["+lockDir.getAbsolutePath()+"]");
+            }
+        } else {
+            TestUtils.deleteDirectoryContents(lockDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java b/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java
new file mode 100644
index 0000000..c38d8d3
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java
@@ -0,0 +1,54 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.listener.EventProducer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SimpleEventProducer implements EventProducer {
+
+	private Set<org.apache.s4.listener.EventHandler> handlers = new HashSet<org.apache.s4.listener.EventHandler>();
+	private String streamName;
+
+	LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+	public void init() {
+	}
+
+	@Override
+	public void addHandler(EventHandler handler) {
+		handlers.add(handler);
+
+	}
+
+	@Override
+	public boolean removeHandler(EventHandler handler) {
+		return handlers.remove(handler);
+	}
+
+	public void setStreamName(String streamName) {
+		this.streamName = streamName;
+	}
+
+	public String getStreamName() {
+		return streamName;
+	}
+
+	// TODO JSON-like stuff
+	public void produceEvent(String message) {
+		EventWrapper ew = new EventWrapper(streamName, message, null);
+		for (org.apache.s4.listener.EventHandler handler : handlers) {
+			try {
+				handler.processEvent(ew);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+	}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java b/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
new file mode 100644
index 0000000..cfb9132
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
@@ -0,0 +1,136 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.processor.AbstractPE;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class StatefulTestPE extends AbstractPE implements Watcher {
+
+    String id;
+    String value1 = "";
+    String value2 = "";
+    transient ZooKeeper zk = null;
+    transient public static File DATA_FILE = new File(
+            System.getProperty("user.dir")
+            + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
+
+    @Override
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public void output() {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void processEvent(KeyValue event) {
+        if (zk == null) {
+            try {
+                zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
+            S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
+        }
+        try {
+
+            if ("value1".equals(event.getKey())) {
+                setValue1(event.getValue());
+                zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            } else if ("value2".equals(event.getKey())) {
+                setValue2(event.getValue());
+                zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+            } else if ("initiateCheckpoint".equals(event.getKey())) {
+                initiateCheckpoint();
+            } else {
+                throw new RuntimeException("unidentified event: " + event);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public String getValue1() {
+        return value1;
+    }
+
+    public void setValue1(String value1) {
+        this.value1 = value1;
+        persistValues();
+    }
+
+    public String getValue2() {
+        return value2;
+    }
+
+    public void setValue2(String value2) {
+        this.value2 = value2;
+        persistValues();
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    protected void checkpoint() {
+        super.checkpoint();
+        try {
+            zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    // NOTE: we use a file as a simple way to keep track of changes
+    private void persistValues() {
+
+        if (DATA_FILE.exists()) {
+            if (!DATA_FILE.delete()) {
+                throw new RuntimeException("Cannot delete datafile "
+                        + DATA_FILE.getAbsolutePath());
+            }
+        }
+        try {
+            if (!DATA_FILE.createNewFile()) {
+                throw new RuntimeException("Cannot create datafile "
+                        + DATA_FILE.getAbsolutePath());
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Cannot create datafile "
+                    + DATA_FILE.getAbsolutePath());
+        }
+        try {
+            TestUtils.writeStringToFile("value1=" + value1 + " ; value2=" + value2,
+                    DATA_FILE);
+        } catch (IOException e) {
+            throw new RuntimeException("Cannot write to datafile "
+                    + DATA_FILE.getAbsolutePath());
+        }
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+        // TODO Auto-generated method stub
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java b/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java
new file mode 100644
index 0000000..f3e620c
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java
@@ -0,0 +1,146 @@
+package org.apache.s4.ft;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.s4.ft.SafeKeeper.StorageResultCode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRedisStateStorage {
+
+    private static Process redisDaemon;
+    private static final String PAYLOAD = "payload";
+    private RedisStateStorage storage;
+
+    @BeforeClass
+    public static void runRedis() throws IOException {
+        // String cmdline = "pwd";
+        List<String> cmdList = new ArrayList<String>();
+
+        cmdList.add(findCompiledRedisForPlatform());
+        ProcessBuilder pb = new ProcessBuilder(cmdList);
+        pb.directory(new File(System.getProperty("user.dir")));
+        pb.redirectErrorStream();
+        redisDaemon = pb.start();
+        BufferedReader br = new BufferedReader(new InputStreamReader(
+                redisDaemon.getInputStream()));
+        String s;
+        int maxLinesBeforeOK=  4;
+        for (int i = 0; i < maxLinesBeforeOK; i++) {
+            if ((s=br.readLine())!=null) {
+                if (s.contains("The server is now ready to accept connections on port 6379")) {
+                    break;
+                }
+            } else {
+                break;
+            }
+        }
+
+        // redisDaemon = Runtime.getRuntime().exec(cmdline);
+        // BufferedReader br = new BufferedReader(new
+        // InputStreamReader(redisDaemon.getInputStream()));
+        // String s;
+        // while ((s = br.readLine()) != null)
+        // System.out.println(s);
+    }
+
+    private static String findCompiledRedisForPlatform() {
+        // TODO add compiled versions for other platforms/architectures
+        String osName = System.getProperty("os.name");
+        String osArch = System.getProperty("os.arch");
+        if (osName.equalsIgnoreCase("Mac OS X")) {
+            if (osArch.equalsIgnoreCase("x86_64")) {
+                return "src/test/resources/macosx/redis-server-64bits";
+            }
+        }
+        if (osName.equalsIgnoreCase("Linux")) {
+            if (osArch.equalsIgnoreCase("i386")) {
+                return "src/test/resources/linux/redis-server-32bits";
+            }
+        }
+        if (!new File(System.getProperty("user.dir")
+                + "/src/test/resources/redis-server").exists()) {
+            Assert.fail("Could not find a redis server executable for your platform. Please place an executable redis server version compiled for your platform in s4-core/src/test/resources, named 'redis-server'");
+        }
+        return "src/test/resources/redis-server";
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        storage = clearRedis();
+    }
+
+    public static RedisStateStorage clearRedis() {
+        RedisStateStorage storage = new RedisStateStorage();
+        storage.setRedisHost("localhost");
+        storage.setRedisPort(6379);
+        storage.init();
+        storage.clear();
+        return storage;
+    }
+
+    @Test
+    public void testFetchState() throws IOException, InterruptedException {
+        SafeKeeperId key = new SafeKeeperId("prototype", "key");
+        final CountDownLatch signalAllSaved = new CountDownLatch(1);        
+        storage.saveState(key, PAYLOAD.getBytes(), new StorageCallback() {
+            @Override
+            public void storageOperationResult(StorageResultCode resultCode, Object message) {
+                signalAllSaved.countDown();
+            }
+        });
+        signalAllSaved.await(5, TimeUnit.SECONDS);
+        byte[] result = storage.fetchState(key);
+        String recovered = new String(result);
+        assertEquals(PAYLOAD, recovered);
+    }
+
+    @Test
+    public void testFetchStoredKeys() throws InterruptedException {
+        Set<SafeKeeperId> fixture = new HashSet<SafeKeeperId>();
+        for (int i = 0; i < 10; i++) {
+            fixture.add(new SafeKeeperId("prototype", "key" + i));
+        }
+        final CountDownLatch signalAllSaved = new CountDownLatch(10);        
+        for (SafeKeeperId skid : fixture) {
+            storage.saveState(skid, PAYLOAD.getBytes(), new StorageCallback() {
+                @Override
+                public void storageOperationResult(StorageResultCode resultCode, Object message) {
+                    signalAllSaved.countDown();
+                }
+            });
+        }
+        
+        signalAllSaved.await(5, TimeUnit.SECONDS);
+        // retrieve the keys
+        Set<SafeKeeperId> result = storage.fetchStoredKeys();
+        assertEquals(fixture.size(), result.size());
+        assertEquals(fixture, result);
+    }
+
+    @AfterClass
+    public static void stopRedis() throws InterruptedException {
+        redisDaemon.destroy();
+        int exitcode = redisDaemon.waitFor();
+        if (exitcode != 0)
+            System.out.println("Redis exited with non zero exit code: "
+                    + exitcode);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
new file mode 100644
index 0000000..84af34f
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -0,0 +1,399 @@
+package org.apache.s4.ft;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+/**
+ * Contains static methods that can be used in tests for things such as:
+ * - files utilities: strings <-> files conversion, directory recursive delete etc...
+ * - starting local instances for zookeeper and bookkeeper
+ * - distributed latches through zookeeper
+ * - etc...
+ *
+ */
+public class TestUtils {
+
+    public static final int ZK_PORT = 21810;
+    public static Process forkS4App(String testClassName, String s4CoreConfFileName) throws IOException,
+            InterruptedException {
+        List<String> cmdList = new ArrayList<String>();
+        cmdList.add("java");
+        cmdList.add("-cp");
+        cmdList.add(System.getProperty("java.class.path"));
+        cmdList.add("-Dcommlayer_mode=static");
+        cmdList.add("-Dcommlayer.mode=static");
+        cmdList.add("-Dlock_dir=" + S4App.lockDirPath);
+        cmdList.add("-Dlog4j.configuration=file://"
+                + System.getProperty("user.dir")
+                + "/src/test/resources/log4j.xml");
+//        cmdList.add("-Xdebug");
+//        cmdList.add("-Xnoagent");
+//        cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+        cmdList.add(S4App.class.getName());
+        cmdList.add(testClassName);
+        cmdList.add(s4CoreConfFileName);
+
+        ProcessBuilder pb = new ProcessBuilder(cmdList);
+        pb.directory(new File(System.getProperty("user.dir")));
+        pb.redirectErrorStream();
+        pb.toString();
+        final Process process = pb.start();
+        // TODO some synchro with s4 platform ready state
+        Thread.sleep(2500);
+
+        // if (start.exitValue() != 0) {
+        // System.out.println("here");
+        // }
+        new Thread(new Runnable() {
+            public void run() {
+                BufferedReader br = new BufferedReader(new InputStreamReader(
+                        process.getInputStream()));
+                String line;
+                try {
+                    line = br.readLine();
+                    while (line != null) {
+                        System.out.println(line);
+                        line = br.readLine();
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+
+        return process;
+    }
+
+    public static void killS4App(Process forkedApp) throws IOException,
+            InterruptedException {
+        if (forkedApp != null) {
+            forkedApp.destroy();
+        }
+    }
+
+    public static void writeStringToFile(String s, File f) throws IOException {
+        if (f.exists()) {
+            if (!f.delete()) {
+                throw new RuntimeException("Cannot delete file "
+                        + f.getAbsolutePath());
+            }
+        }
+
+        FileWriter fw = null;
+        try {
+            if (!f.createNewFile()) {
+                throw new RuntimeException("Cannot create new file : "
+                        + f.getAbsolutePath());
+            }
+            fw = new FileWriter(f);
+
+            fw.write(s);
+        } catch (IOException e) {
+            throw (e);
+        } finally {
+            if (fw != null) {
+                try {
+                    fw.close();
+                } catch (IOException e) {
+                    throw (e);
+                }
+            }
+        }
+    }
+
+    public static String readFile(File f) throws IOException {
+        BufferedReader br = null;
+        try {
+            br = new BufferedReader(new FileReader(f));
+            StringBuilder sb = new StringBuilder();
+            String line = br.readLine();
+            while (line != null) {
+                sb.append(line);
+                line = br.readLine();
+                if (line != null) {
+                    sb.append("\n");
+                }
+            }
+            return sb.toString();
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    throw (e);
+                }
+            }
+        }
+
+    }
+
+    public static NIOServerCnxn.Factory startZookeeperServer()
+            throws IOException, InterruptedException, KeeperException {
+
+        List<String> cmdList = new ArrayList<String>();
+        final File zkDataDir = new File(System.getProperty("user.dir")
+                + File.separator + "tmp" + File.separator + "zookeeper"
+                + File.separator + "data");
+        if (zkDataDir.exists()) {
+            TestUtils.deleteDirectoryContents(zkDataDir);
+        } else {
+            zkDataDir.mkdirs();
+        }
+
+        ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
+        // SyncRequestProcessor.setSnapCount(1000);
+        // final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+        NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(
+                new InetSocketAddress(ZK_PORT));
+        nioZookeeperConnectionFactory.startup(zks);
+        Assert.assertTrue("waiting for server being up",
+                waitForServerUp("localhost", ZK_PORT, 4000));
+        return nioZookeeperConnectionFactory;
+
+    }
+
+    public static void stopZookeeperServer(NIOServerCnxn.Factory f)
+            throws IOException, InterruptedException {
+        if (f != null) {
+            f.shutdown();
+            Assert.assertTrue("waiting for server down",
+                    waitForServerDown("localhost", ZK_PORT, 3000));
+        }
+        
+        
+        
+        // List<String> cmdList = new ArrayList<String>();
+        // cmdList.add(System.getProperty("user.dir")
+        // + "/src/test/scripts/killJavaProcessForPort.sh");
+        // cmdList.add("*:21810");
+        // // int zkPid = Integer.valueOf(readFileAsString(new File(System
+        // // .getProperty("user.dir")
+        // // + File.separator
+        // // + "tmp"
+        // // + File.separator + "zk.pid")));
+        // // cmdList.add(String.valueOf(zkPid));
+        // ProcessBuilder pb = new ProcessBuilder(cmdList);
+        // // pb.directory(new File(System.getProperty("user.dir")));
+        // pb.start().waitFor();
+
+    }
+
+    public static void deleteDirectoryContents(File dir) {
+        File[] files = dir.listFiles();
+        for (File file : files) {
+            if (file.isDirectory()) {
+                deleteDirectoryContents(file);
+            }
+            if (!file.delete()) {
+                throw new RuntimeException("could not delete : " + file);
+            }
+        }
+    }
+
+    public static String readFileAsString(File f) throws IOException {
+        FileReader fr = new FileReader(f);
+        StringBuilder sb = new StringBuilder("");
+        BufferedReader br = new BufferedReader(fr);
+        String line = br.readLine();
+        while (line != null) {
+            sb.append(line);
+            line = br.readLine();
+            if (line != null) {
+                sb.append("\n");
+            }
+        }
+        return sb.toString();
+
+    }
+
+    // TODO factor this code (see BasicFSStateStorage) - or use commons io or
+    // guava
+    public static byte[] readFileAsByteArray(File file) throws Exception {
+        FileInputStream in = null;
+        try {
+            in = new FileInputStream(file);
+
+            long length = file.length();
+
+            /*
+             * Arrays can only be created using int types, so ensure that the
+             * file size is not too big before we downcast to create the array.
+             */
+            if (length > Integer.MAX_VALUE) {
+                throw new IOException("Error file is too large: "
+                        + file.getName() + " " + length + " bytes");
+            }
+
+            byte[] buffer = new byte[(int) length];
+            int offSet = 0;
+            int numRead = 0;
+
+            while (offSet < buffer.length
+                    && (numRead = in.read(buffer, offSet, buffer.length
+                            - offSet)) >= 0) {
+                offSet += numRead;
+            }
+
+            if (offSet < buffer.length) {
+                throw new IOException("Error, could not read entire file: "
+                        + file.getName() + " " + offSet + "/" + buffer.length
+                        + " bytes read");
+            }
+
+            in.close();
+            return buffer;
+
+        } finally {
+            if (in != null) {
+                in.close();
+            }
+        }
+    }
+
+    public static ZooKeeper createZkClient() throws IOException {
+        final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000,
+                new Watcher() {
+                    @Override
+                    public void process(WatchedEvent event) {
+                    }
+                });
+        return zk;
+    }
+
+    public static void watchAndSignalCreation(String path,
+            final CountDownLatch latch, final ZooKeeper zk)
+            throws KeeperException, InterruptedException {
+
+        if (zk.exists(path, false) != null) {
+            zk.delete(path, -1);
+        }
+        zk.exists(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeCreated.equals(event.getType())) {
+                    latch.countDown();
+                }
+            }
+        });
+    }
+    
+    public static void watchAndSignalChangedChildren(String path,
+            final CountDownLatch latch, final ZooKeeper zk)
+            throws KeeperException, InterruptedException {
+
+        zk.getChildren(path, new Watcher() {
+            @Override
+            public void process(WatchedEvent event) {
+                if (EventType.NodeChildrenChanged.equals(event.getType())) {
+                    latch.countDown();
+                }
+            }
+        });
+    }
+
+    // from zookeeper's codebase
+    public static boolean waitForServerUp(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                // if there are multiple hostports, just take the first one
+                String result = send4LetterWord(host, port, "stat");
+                if (result.startsWith("Zookeeper version:")) {
+                    return true;
+                }
+            } catch (IOException ignored) {
+                // ignore as this is expected
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    // from zookeeper's codebase
+    public static String send4LetterWord(String host, int port, String cmd)
+            throws IOException {
+        Socket sock = new Socket(host, port);
+        BufferedReader reader = null;
+        try {
+            OutputStream outstream = sock.getOutputStream();
+            outstream.write(cmd.getBytes());
+            outstream.flush();
+            // this replicates NC - close the output stream before reading
+            sock.shutdownOutput();
+
+            reader = new BufferedReader(new InputStreamReader(
+                    sock.getInputStream()));
+            StringBuilder sb = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                sb.append(line + "\n");
+            }
+            return sb.toString();
+        } finally {
+            sock.close();
+            if (reader != null) {
+                reader.close();
+            }
+        }
+    }
+
+    // from zookeeper's codebase
+    public static boolean waitForServerDown(String host, int port, long timeout) {
+        long start = System.currentTimeMillis();
+        while (true) {
+            try {
+                send4LetterWord(host, port, "stat");
+            } catch (IOException e) {
+                return true;
+            }
+
+            if (System.currentTimeMillis() > start + timeout) {
+                break;
+            }
+            try {
+                Thread.sleep(250);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+        return false;
+    }
+
+    public static void cleanupTmpDirs() {
+        if (S4TestCase.DEFAULT_TEST_OUTPUT_DIR.exists()) {
+            deleteDirectoryContents(S4TestCase.DEFAULT_TEST_OUTPUT_DIR);
+        }
+        S4TestCase.DEFAULT_STORAGE_DIR.mkdirs();
+    
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/adapter.properties
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/adapter.properties b/s4-core/src/test/java/org/apache/s4/ft/adapter.properties
new file mode 100644
index 0000000..ae5169d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/adapter.properties
@@ -0,0 +1,2 @@
+appName=s4
+listenerAppName=s4
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml
new file mode 100644
index 0000000..f19c1e7
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+  <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+    <property name="location">
+      <value>classpath:adapter.properties</value>
+    </property>
+    <property name="ignoreUnresolvablePlaceholders" value="true"/>
+  </bean> 
+  
+    <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter" init-method="init">
+    <property name="serDeser" ref="serDeser"/>
+    <property name="appName" value="${appName}"/>
+    <property name="listenerAppName" value="${listenerAppName}"/>
+    <property name="monitor" ref="monitor"/>
+  </bean>
+
+  <bean id="serDeser" class="io.s4.serialize.KryoSerDeser"/>
+
+  <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true" init-method="init">
+    <property name="flushInterval" value="30"/>
+    <property name="loggerName" value="monitor"/>
+  </bean>
+
+  <bean id="dummyPartitioner" class="io.s4.dispatcher.partitioner.DummyPartitioner"/>
+
+  <bean id="dispatcher" class="io.s4.dispatcher.Dispatcher" init-method="init">
+    <property name="partitioners">
+      <list>
+        <ref bean="dummyPartitioner"/>
+      </list>
+    </property>
+    <property name="eventEmitter" ref="commLayerEmitter"/>
+    <property name="loggerName" value="s4"/>
+  </bean>
+
+  <bean id="adapter" class="io.s4.adapter.Adapter"
+        init-method="init">
+    <property name="dispatcher" ref="dispatcher"/>
+  </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.out.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.out.xml b/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.out.xml
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml
new file mode 100644
index 0000000..abdf03e
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans                                                                             
+            http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+
+  <bean id="eventProducer" class="safekeeper.SimpleEventProducer"
+        init-method="init">
+    <property name="streamName" value="Default"/>
+  </bean>
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml
new file mode 100644
index 0000000..3eec6ec
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+  <!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE">
+    <property name="id" value="printEventPE"/>
+    <property name="keys">
+      <list>
+        <value>TopicSeen topic</value>
+      </list>
+    </property>
+  </bean> -->
+
+  <bean id="statefulPE" class="io.s4.ft.StatefulTestPE">
+    <property name="id" value="statefulPE"/>
+    <property name="keys">
+      <list>
+        <value>Stream1 key</value>
+      </list>
+    </property>
+    <!-- we set the frequency to 1000 so that it checkpointing does NOT get triggered automatically! -->
+    <property name="checkpointingFrequencyByEventCount" value="1000" />  
+  </bean>
+
+
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml
new file mode 100644
index 0000000..6b35e97
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response 
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter). 
+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="io.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+
+	<bean id="rawListener" class="io.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="io.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+	<bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="bkStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+        <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+    </bean>
+    
+    <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+    <bean id="bkStateStorage" class="io.s4.ft.BookKeeperStateStorage" init-method="init">
+        <!-- if not specified, default is <current_dir>/tmp/storage 
+        <property name="storageRootPath" value="${storage_root_path}" /> -->
+        <property name="zkServers" value="localhost:21810"/>
+    </bean>
+
+
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml
new file mode 100755
index 0000000..57d67d7
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+	<bean id="propertyConfigurer"
+		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+		<property name="location">
+			<value>classpath:s4_core.properties</value>
+		</property>
+		<property name="properties">
+			<props>
+				<prop key="kryoSerDeser.initialBufferSize">2048</prop>
+				<prop key="kryoSerDeser.maxBufferSize">262144</prop>
+			</props>
+		</property>
+		<property name="ignoreUnresolvablePlaceholders" value="true" />
+	</bean>
+
+	<bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+	<bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="listenerAppName" value="${adapter_app_name}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="listener" ref="rawListener" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+		<property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+		<property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+	</bean>
+
+	<!--START: Dispatchers for control event processor. If stream name in Response 
+		is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter). 
+		Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+	<bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+		<property name="dispatchers">
+			<list>
+				<ref bean="ctrlDispatcherFilteredS4" />
+				<ref bean="ctrlDispatcherFilteredAdapter" />
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherAdapter" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+		<property name="dispatcher" ref="ctrlDispatcherS4" />
+		<property name="streams">
+			<list>
+				<value>@${adapter_app_name}</value>
+			</list>
+		</property>
+	</bean>
+
+	<bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+
+	<bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="genericPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+	<!-- END: Dispatchers for control events -->
+
+	<!-- Control Events handler -->
+	<bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+		<property name="dispatcher" ref="ctrlDispatcher" />
+	</bean>
+
+	<bean id="peContainer" class="io.s4.processor.PEContainer"
+		init-method="init" lazy-init="true">
+		<property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+		<property name="trackByKey" value="true" />
+		<property name="clock" ref="clock" />
+		<property name="controlEventProcessor" ref="ctrlHandler" />
+		<property name="safeKeeper" ref="safeKeeper" />
+	</bean>
+
+	<bean id="rawListener" class="io.s4.listener.CommLayerListener"
+		init-method="init">
+		<property name="serDeser" ref="serDeser" />
+		<property name="clusterManagerAddress" value="${zk_address}" />
+		<property name="appName" value="${s4_app_name}" />
+		<property name="maxQueueSize" value="${listener_max_queue_size}" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="eventListener" class="io.s4.collector.EventListener"
+		init-method="init">
+		<property name="rawListener" ref="rawListener" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="monitor" ref="monitor" />
+	</bean>
+
+	<bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+		init-method="init">
+		<property name="flushInterval" value="30" />
+		<property name="loggerName" value="monitor" />
+	</bean>
+
+	<bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+		lazy-init="true">
+		<property name="monitor" ref="monitor" />
+		<property name="peContainer" ref="peContainer" />
+		<property name="minimumMemory" value="52428800" />
+	</bean>
+
+
+
+
+	<!-- Some useful beans related to client-adapter for apps -->
+
+	<!-- Dispatcher to send to all adapter nodes. -->
+	<bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="broadcastPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<!-- Partitioner to achieve broadcast -->
+	<bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+	<bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="loopbackPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+
+    <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+        <property name="eventEmitter" ref="commLayerEmitter"/>
+    </bean>
+
+    <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+        <property name="stateStorage" ref="fsStateStorage" />
+        <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+        <property name="serializer" ref="serDeser"/>
+        <property name="hasher" ref="hasher"/>
+        <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+    </bean>
+    
+    <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+    
+    <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+        <!-- if not specified, default is <current_dir>/tmp/storage 
+        <property name="storageRootPath" value="${storage_root_path}" /> -->
+    </bean>
+    
+
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml
new file mode 100644
index 0000000..e149ecc
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans              http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ 
+  <bean id="clock" class="io.s4.util.clock.WallClock"/>
+ 
+</beans>

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
new file mode 100644
index 0000000..37a68d1
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -0,0 +1,159 @@
+package org.apache.s4.ft.wordcount;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestRedisStateStorage;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.wordcount.WordCountTest;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * 
+ * We use 2 lists of words that we inject in a word counting s4 system.
+ * 
+ * After processing the first sentence, we just kill the platform and restart
+ * it.
+ * 
+ * Then we inject the second sentence.
+ * 
+ * 
+ * We verify that no state was lost, i.e. that the words count includes words
+ * from both the first and the second sentence.
+ * 
+ * NOTE 1: we synchronize through zookeeper to control when to kill the
+ * application, and when to verify assertions. NOTE 2: we use some additional
+ * explicit waits for bookkeeper backend so that it gets correctly initialized.
+ * 
+ * 
+ */
+public class FTWordCountTest extends S4TestCase {
+
+    private static Factory zookeeperServerConnectionFactory;
+    private static final String FILESYSTEM_BACKEND_CONF = "s4_core_conf_fs_backend.xml";
+    private static final String REDIS_BACKEND_CONF = "s4_core_conf_redis_backend.xml";
+    private Process forkedS4App = null;
+
+    @Test
+    public void testFileSystemBackend() throws Exception {
+        doTestCheckpointingAndRecovery(FILESYSTEM_BACKEND_CONF);
+    }
+
+    @Test
+    public void testRedisBackend() throws Exception {
+        TestRedisStateStorage.runRedis();
+        TestRedisStateStorage.clearRedis();
+        doTestCheckpointingAndRecovery(REDIS_BACKEND_CONF);
+        TestRedisStateStorage.stopRedis();
+    }
+
+    @Before
+    public void prepare() throws Exception {
+        TestUtils.cleanupTmpDirs();
+        S4TestCase.initS4Parameters();
+        zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+        if (forkedS4App != null) {
+            forkedS4App.destroy();
+        }
+    }
+
+    // we send 1 sentence, wait for all words to be processed, then crash the
+    // app
+    // we do that for 3 sentences, in order to make sure that recovery does not
+    // introduce side effects.
+    public void doTestCheckpointingAndRecovery(String backendConf)
+            throws Exception {
+        final ZooKeeper zk = TestUtils.createZkClient();
+
+        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+                zk);
+        EventGenerator gen = new EventGenerator();
+
+        // add authorizations for processing
+        for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+        }
+        CountDownLatch signalSentence1Processed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/classifierIteration_"
+                + WordCountTest.SENTENCE_1_TOTAL_WORDS,
+                signalSentence1Processed, zk);
+        gen.injectValueEvent(
+                new KeyValue("sentence", WordCountTest.SENTENCE_1),
+                "Sentences", 0);
+        signalSentence1Processed.await(10, TimeUnit.SECONDS);
+        Thread.sleep(1000);
+        
+        
+        // crash the app
+        forkedS4App.destroy();
+
+        // recovering and making sure checkpointing still works
+        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+
+        // add authorizations for continuing processing. Without these, the
+        // WordClassifier processed keeps waiting
+        for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS
+                + WordCountTest.SENTENCE_2_TOTAL_WORDS; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+        }
+
+        CountDownLatch sentence2Processed = new CountDownLatch(1);
+        TestUtils
+                .watchAndSignalCreation(
+                        "/classifierIteration_"
+                                + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
+                        sentence2Processed, zk);
+
+        gen.injectValueEvent(
+                new KeyValue("sentence", WordCountTest.SENTENCE_2),
+                "Sentences", 0);
+
+        sentence2Processed.await(10, TimeUnit.SECONDS);
+        Thread.sleep(1000);
+
+        // crash the app
+        forkedS4App.destroy();
+        forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+
+        // add authorizations for continuing processing. Without these, the
+        // WordClassifier processed keeps waiting
+        for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS
+                + WordCountTest.SENTENCE_2_TOTAL_WORDS + 1; i <= WordCountTest.TOTAL_WORDS; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.EPHEMERAL);
+        }
+        gen.injectValueEvent(
+                new KeyValue("sentence", WordCountTest.SENTENCE_3),
+                "Sentences", 0);
+        signalTextProcessed.await(10, TimeUnit.SECONDS);
+        File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+                + File.separator + "wordcount");
+        String s = TestUtils.readFile(results);
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml
new file mode 100644
index 0000000..d1032c0
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://www.springframework.org/schema/beans             http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+	<!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE"> <property 
+		name="id" value="printEventPE"/> <property name="keys"> <list> <value>TopicSeen 
+		topic</value> </list> </property> </bean> -->
+
+	<bean id="wordSplitter" class="io.s4.wordcount.WordSplitter">
+		<property name="id" value="wordSplitter" />
+		<property name="keys">
+			<list>
+				<value>Sentences *</value>
+			</list>
+		</property>
+		<property name="outputStreamName" value="Words" />
+		<property name="dispatcher" ref="wordDispatcher" />
+	</bean>
+
+	<bean id="wordCounter" class="io.s4.wordcount.WordCounter">
+		<property name="id" value="wordMapper" />
+		<property name="keys">
+			<list>
+				<value>Words word</value>
+			</list>
+		</property>
+		<property name="outputFrequencyByEventCount" value="1" />
+		<property name="outputStreamName" value="WordsCount" />
+		<property name="dispatcher" ref="wordsCountDispatcher" />
+        <property name="checkpointingFrequencyByEventCount" value="1" />       
+	</bean>
+
+	<bean id="wordClassifier" class="io.s4.wordcount.WordClassifier">
+		<property name="id" value="wordClassifier" />
+		<property name="keys">
+			<list>
+			    <!-- using a common agreed-upon routing key value -->
+				<value>WordsCount routingKey</value>
+			</list>
+		</property>
+		<property name="checkpointingFrequencyByEventCount" value="1" />
+	</bean>
+
+	<bean id="wordDispatcher" class="io.s4.dispatcher.Dispatcher"
+		init-method="init">
+		<property name="partitioners">
+			<list>
+				<ref bean="wordPartitioner" />
+			</list>
+		</property>
+		<property name="eventEmitter" ref="commLayerEmitter" />
+		<property name="loggerName" value="s4" />
+	</bean>
+
+	<bean id="wordPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+		<property name="streamNames">
+			<list>
+				<value>Words</value>
+			</list>
+		</property>
+		<property name="hashKey">
+			<list>
+				<value>word</value>
+			</list>
+		</property>
+		<property name="hasher" ref="hasher" />
+		<property name="debug" value="false" />
+	</bean>
+	
+	<bean id="wordsCountDispatcher" class="io.s4.dispatcher.Dispatcher"
+        init-method="init">
+        <property name="partitioners">
+            <list>
+                <ref bean="wordsCountPartitioner" />
+            </list>
+        </property>
+        <property name="eventEmitter" ref="commLayerEmitter" />
+        <property name="loggerName" value="s4" />
+    </bean>
+    
+	<bean id="wordsCountPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+        <property name="streamNames">
+            <list>
+                <value>WordsCount</value>
+            </list>
+        </property>
+        <property name="hasher" ref="hasher" />
+        <property name="debug" value="false" />
+    </bean>
+	
+	
+	
+</beans>