You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC
[26/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml b/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml
deleted file mode 100755
index 20c8bb1..0000000
--- a/s4-core/src/test/java/io/s4/wordcount/s4_core_conf.xml
+++ /dev/null
@@ -1,194 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
- <bean id="propertyConfigurer"
- class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- <property name="location">
- <value>classpath:s4_core.properties</value>
- </property>
- <property name="properties">
- <props>
- <prop key="kryoSerDeser.initialBufferSize">2048</prop>
- <prop key="kryoSerDeser.maxBufferSize">262144</prop>
- </props>
- </property>
- <property name="ignoreUnresolvablePlaceholders" value="true" />
- </bean>
-
- <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
-
- <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
- init-method="init">
- <property name="serDeser" ref="serDeser" />
- <property name="listener" ref="rawListener" />
- <property name="listenerAppName" value="${adapter_app_name}" />
- <property name="monitor" ref="monitor" />
- </bean>
-
- <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
- init-method="init">
- <property name="serDeser" ref="serDeser" />
- <property name="listener" ref="rawListener" />
- <property name="monitor" ref="monitor" />
- </bean>
-
- <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
- <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
- <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
- </bean>
-
- <!--START: Dispatchers for control event processor. If stream name in Response
- is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
- Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
- <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
- <property name="dispatchers">
- <list>
- <ref bean="ctrlDispatcherFilteredS4" />
- <ref bean="ctrlDispatcherFilteredAdapter" />
- </list>
- </property>
- </bean>
-
- <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
- <property name="dispatcher" ref="ctrlDispatcherAdapter" />
- <property name="streams">
- <list>
- <value>@${adapter_app_name}</value>
- </list>
- </property>
- </bean>
-
- <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
- <property name="dispatcher" ref="ctrlDispatcherS4" />
- <property name="streams">
- <list>
- <value>@${adapter_app_name}</value>
- </list>
- </property>
- </bean>
-
- <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
- <property name="hasher" ref="hasher" />
- <property name="debug" value="false" />
- </bean>
-
- <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
- init-method="init">
- <property name="partitioners">
- <list>
- <ref bean="genericPartitioner" />
- </list>
- </property>
- <property name="eventEmitter" ref="commLayerEmitter" />
- <property name="loggerName" value="s4" />
- </bean>
-
- <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
- init-method="init">
- <property name="partitioners">
- <list>
- <ref bean="genericPartitioner" />
- </list>
- </property>
- <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
- <property name="loggerName" value="s4" />
- </bean>
- <!-- END: Dispatchers for control events -->
-
- <!-- Control Events handler -->
- <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
- <property name="dispatcher" ref="ctrlDispatcher" />
- </bean>
-
- <bean id="peContainer" class="io.s4.processor.PEContainer"
- init-method="init" lazy-init="true">
- <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
- <property name="monitor" ref="monitor" />
- <property name="trackByKey" value="true" />
- <property name="clock" ref="clock" />
- <property name="controlEventProcessor" ref="ctrlHandler" />
- <property name="safeKeeper" ref="safeKeeper" />
- </bean>
-
- <bean id="rawListener" class="io.s4.listener.CommLayerListener"
- init-method="init">
- <property name="serDeser" ref="serDeser" />
- <property name="clusterManagerAddress" value="${zk_address}" />
- <property name="appName" value="${s4_app_name}" />
- <property name="maxQueueSize" value="${listener_max_queue_size}" />
- <property name="monitor" ref="monitor" />
- </bean>
-
- <bean id="eventListener" class="io.s4.collector.EventListener"
- init-method="init">
- <property name="rawListener" ref="rawListener" />
- <property name="peContainer" ref="peContainer" />
- <property name="monitor" ref="monitor" />
- </bean>
-
- <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
- init-method="init">
- <property name="flushInterval" value="30" />
- <property name="loggerName" value="monitor" />
- </bean>
-
- <bean id="watcher" class="io.s4.util.Watcher" init-method="init"
- lazy-init="true">
- <property name="monitor" ref="monitor" />
- <property name="peContainer" ref="peContainer" />
- <property name="minimumMemory" value="52428800" />
- </bean>
-
-
-
-
- <!-- Some useful beans related to client-adapter for apps -->
-
- <!-- Dispatcher to send to all adapter nodes. -->
- <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
- init-method="init">
- <property name="partitioners">
- <list>
- <ref bean="broadcastPartitioner" />
- </list>
- </property>
- <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
- <property name="loggerName" value="s4" />
- </bean>
-
- <!-- Partitioner to achieve broadcast -->
- <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
-
-
-
- <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
- init-method="init">
- <property name="partitioners">
- <list>
- <ref bean="loopbackPartitioner" />
- </list>
- </property>
- <property name="eventEmitter" ref="commLayerEmitter" />
- <property name="loggerName" value="s4" />
- </bean>
-
- <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
- <property name="eventEmitter" ref="commLayerEmitter"/>
- </bean>
-
- <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
- <property name="stateStorage" ref="fsStateStorage" />
- <property name="loopbackDispatcher" ref="loopbackDispatcher" />
- <property name="serializer" ref="serDeser"/>
- <property name="hasher" ref="hasher"/>
- </bean>
-
- <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="checkStorageDir">
- <!-- if not specified, default is <current_dir>/tmp/storage
- <property name="storageRootPath" value="${storage_root_path}" /> -->
- </bean>
-
-
-
-</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml b/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml
deleted file mode 100644
index e149ecc..0000000
--- a/s4-core/src/test/java/io/s4/wordcount/wall_clock.xml
+++ /dev/null
@@ -1,6 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
-
- <bean id="clock" class="io.s4.util.clock.WallClock"/>
-
-</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
new file mode 100644
index 0000000..c0dfa30
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
@@ -0,0 +1,108 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.serialize.KryoSerDeser;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.reflectasm.FieldAccess;
+
+public class CheckpointingTest extends S4TestCase {
+
+ private static Factory zookeeperServerConnectionFactory = null;
+ private S4App app;
+
+ @Before
+ public void prepare() throws Exception {
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+ app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
+ app.initializeS4App();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ if (app!=null) {
+ app.destroy();
+ }
+ }
+
+ @Test
+ public void testCheckpointStorage() throws Exception {
+ final ZooKeeper zk = TestUtils.createZkClient();
+
+
+
+ // 2. generate a simple event that creates and changes the state of
+ // the
+ // PE
+
+ // NOTE: coordinate through zookeeper
+ final CountDownLatch signalValue1Set = new CountDownLatch(1);
+
+ TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+ final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/checkpointed",
+ signalCheckpointed, zk);
+ EventGenerator gen = new EventGenerator();
+ gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1",
+ 0);
+
+ signalValue1Set.await();
+ StatefulTestPE pe = (StatefulTestPE) S4TestCase.registeredPEs
+ .get(new SafeKeeperId("statefulPE", "value"));
+ Assert.assertEquals("message1", pe.getValue1());
+ Assert.assertEquals("", pe.getValue2());
+
+ // 3. generate a checkpoint event
+ gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
+ "Stream1", 0);
+ signalCheckpointed.await();
+
+ // NOTE: the backend has asynchronous save operations
+ Thread.sleep(1000);
+
+ SafeKeeperId safeKeeperId = pe.getSafeKeeperId();
+ File expected = new File(System.getProperty("user.dir")
+ + File.separator
+ + "tmp"
+ + File.separator
+ + "storage"
+ + File.separator
+ + safeKeeperId.getPrototypeId()
+ + File.separator
+ + Base64.encodeBase64URLSafeString(safeKeeperId
+ .getStringRepresentation().getBytes()));
+
+ // 4. verify that state was correctly persisted
+ Assert.assertTrue(expected.exists());
+
+ StatefulTestPE refPE = new StatefulTestPE();
+ refPE.setValue1("message1");
+ refPE.setId("statefulPE");
+ refPE.setKeys(new String[] {});
+ KryoSerDeser kryoSerDeser = new KryoSerDeser();
+ byte[] refBytes = kryoSerDeser.serialize(refPE);
+
+ Assert.assertTrue(Arrays.equals(refBytes,
+ TestUtils.readFileAsByteArray(expected)));
+
+ app.destroy();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java b/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java
new file mode 100644
index 0000000..bb4aa9c
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/EventGenerator.java
@@ -0,0 +1,63 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
+import org.apache.s4.dispatcher.partitioner.KeyInfo;
+import org.apache.s4.emitter.CommLayerEmitter;
+import org.apache.s4.schema.Schema;
+import org.apache.s4.serialize.KryoSerDeser;
+import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.LoadGenerator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+public class EventGenerator {
+
+ private CommLayerEmitter eventEmitter;
+
+ public EventGenerator() {
+ SerializerDeserializer serDeser = new KryoSerDeser();
+
+ eventEmitter = new CommLayerEmitter();
+ eventEmitter.setAppName("s4");
+ eventEmitter.setListenerAppName("s4");
+ eventEmitter.setClusterManagerAddress("localhost");
+ eventEmitter
+ .setSenderId(String.valueOf(System.currentTimeMillis() / 1000));
+ eventEmitter.setSerDeser(serDeser);
+ eventEmitter.init();
+
+ LoadGenerator generator = new LoadGenerator();
+ generator.setEventEmitter(eventEmitter);
+ }
+
+ public void injectValueEvent(KeyValue keyValue, String streamName,
+ int partitionId) throws JSONException {
+
+ Schema schema = new Schema(KeyValue.class);
+ JSONObject jsonRecord = new JSONObject("{key:" + keyValue.getKey()
+ + ",value:" + keyValue.getValue() + "}");
+ Object event = LoadGenerator.makeRecord(jsonRecord, schema);
+ CompoundKeyInfo compoundKeyInfo = new CompoundKeyInfo();
+ compoundKeyInfo.setCompoundKey("key");
+ compoundKeyInfo.setCompoundValue("value");
+ List<CompoundKeyInfo> compoundKeyInfos = new ArrayList<CompoundKeyInfo>();
+ compoundKeyInfos.add(compoundKeyInfo);
+ EventWrapper eventWrapper = new EventWrapper(streamName, event,
+ compoundKeyInfos);
+ eventEmitter.emit(partitionId, eventWrapper);
+ }
+
+ public void injectEvent(Object event, String streamName, int partitionId,
+ List<CompoundKeyInfo> compoundKeyInfos) throws JSONException {
+
+ EventWrapper eventWrapper = new EventWrapper(streamName, event,
+ compoundKeyInfos);
+ eventEmitter.emit(partitionId, eventWrapper);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java b/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java
new file mode 100644
index 0000000..e94ae2f
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/KeyValue.java
@@ -0,0 +1,33 @@
+package org.apache.s4.ft;
+
+public class KeyValue {
+
+ String key;
+ String value;
+
+ public KeyValue() {
+ }
+
+ public KeyValue(String key, String value) {
+ super();
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
new file mode 100644
index 0000000..1ca5652
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -0,0 +1,113 @@
+package org.apache.s4.ft;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecoveryTest extends S4TestCase {
+
+ public static long ZOOKEEPER_PORT = 21810;
+ private Process forkedS4App = null;
+ private static Factory zookeeperServerConnectionFactory = null;
+
+ @Before
+ public void prepare() throws Exception {
+ TestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+ final ZooKeeper zk = TestUtils.createZkClient();
+ try {
+ zk.delete("/value1Set", -1);
+ } catch (Exception ignored) {
+ }
+ try {
+ // FIXME can't figure out where this is retained
+ zk.delete("/value2Set", -1);
+ } catch (Exception ignored) {
+ }
+ try {
+ // FIXME can't figure out where this is retained
+ zk.delete("/checkpointed", -1);
+ } catch (Exception ignored) {
+ }
+ zk.close();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ TestUtils.killS4App(forkedS4App);
+ }
+
+ @Test
+ public void testCheckpointRestorationThroughApplicationEvent()
+ throws Exception {
+ final ZooKeeper zk = TestUtils.createZkClient();
+ // 1. instantiate remote S4 app
+ forkedS4App = TestUtils.forkS4App(getClass().getName(),
+ "s4_core_conf_fs_backend.xml");
+ // TODO synchro
+ Thread.sleep(4000);
+
+ CountDownLatch signalValue1Set = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+
+ // 2. generate a simple event that changes the state of the PE
+ // --> this event triggers recovery
+ // we inject a value for value2 field (was for value1 in
+ // checkpointing
+ // test). This should trigger recovery and provide a pe with value1
+ // and
+ // value2 set:
+ // value1 from recovery, and value2 from injected event.
+ EventGenerator gen = new EventGenerator();
+ gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1", 0);
+ signalValue1Set.await();
+ final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
+ zk);
+ // trigger checkpoint
+ gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
+ "Stream1", 0);
+ signalCheckpointed.await();
+ // signalCheckpointAddedByBK.await();
+
+ signalValue1Set = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+ gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
+ signalValue1Set.await();
+ Assert.assertEquals("value1=message1b ; value2=",
+ TestUtils.readFile(StatefulTestPE.DATA_FILE));
+
+ Thread.sleep(2000);
+ // kill app
+ forkedS4App.destroy();
+ // S4App.killS4App(getClass().getName());
+
+ StatefulTestPE.DATA_FILE.delete();
+
+ forkedS4App = TestUtils.forkS4App(getClass().getName(),
+ "s4_core_conf_fs_backend.xml");
+ // TODO synchro
+ Thread.sleep(2000);
+ // trigger recovery by sending application event to set value 2
+ CountDownLatch signalValue2Set = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+
+ gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
+ signalValue2Set.await(10, TimeUnit.SECONDS);
+
+ // we should get "message1" (checkpointed) instead of "message1b"
+ // (latest)
+ Assert.assertEquals("value1=message1 ; value2=message2",
+ TestUtils.readFile(StatefulTestPE.DATA_FILE));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/S4App.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/S4App.java b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
new file mode 100644
index 0000000..bd4a6df
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/S4App.java
@@ -0,0 +1,201 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.processor.AbstractPE;
+import org.apache.s4.processor.PEContainer;
+import org.apache.s4.util.Watcher;
+import org.apache.s4.util.clock.Clock;
+import org.apache.s4.util.clock.EventClock;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.StringWriter;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.log4j.Logger;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.FileSystemXmlApplicationContext;
+import org.springframework.core.io.ClassPathResource;
+
+/**
+ *
+ *
+ */
+public class S4App {
+
+ String configType = "typical";
+ long seedTime = 0;
+ ApplicationContext appContext = null;
+ ApplicationContext adapterContext = null;
+ private String configBase;
+ boolean configPathsInitialized = false;
+ private String[] coreConfigFileUrls;
+ private Class testClass;
+ private String s4CoreConfFileName;
+ public static File DEFAULT_TEST_OUTPUT_DIR = new File(
+ System.getProperty("user.dir") + File.separator + "tmp");
+ public static File DEFAULT_STORAGE_DIR = new File(
+ DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+ + "storage");
+
+ public static String lockDirPath = System.getProperty("user.dir")
+ + File.separator + "tmp" + File.separator + "lock";
+
+ private S4App() {}
+
+ public S4App(Class testClass, String s4CoreConfFileName) throws Exception {
+ this.testClass = testClass;
+ this.s4CoreConfFileName = s4CoreConfFileName;
+ initConfigPaths(testClass, s4CoreConfFileName);
+ }
+ /**
+ * @param args
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ Class testClass = Class.forName(args[0]);
+ String s4CoreConfFile = args[1];
+ S4App app = new S4App(testClass, s4CoreConfFile);
+ S4TestCase.initS4Parameters();
+ app.initializeS4App();
+
+ }
+
+ /**
+ * Performs dependency injection and starts the S4 plaftform.
+ */
+ public void initializeS4App()
+ throws Exception {
+ initConfigPaths(testClass, s4CoreConfFileName);
+ ApplicationContext coreContext = null;
+
+ coreContext = new FileSystemXmlApplicationContext(coreConfigFileUrls,
+ coreContext);
+ ApplicationContext context = coreContext;
+
+ Clock clock = (Clock) context.getBean("clock");
+ if (clock instanceof EventClock && seedTime > 0) {
+ EventClock s4EventClock = (EventClock) clock;
+ s4EventClock.updateTime(seedTime);
+ System.out.println("Intializing event clock time with seed time "
+ + s4EventClock.getCurrentTime());
+ }
+
+ PEContainer peContainer = (PEContainer) context.getBean("peContainer");
+
+ Watcher w = (Watcher) context.getBean("watcher");
+ w.setConfigFilename(configBase + s4CoreConfFileName);
+
+ // load extension modules
+ // String[] configFileNames = getModuleConfigFiles(extsHome, prop);
+ // if (configFileNames.length > 0) {
+ // String[] configFileUrls = new String[configFileNames.length];
+ // for (int i = 0; i < configFileNames.length; i++) {
+ // configFileUrls[i] = "file:" + configFileNames[i];
+ // }
+ // context = new FileSystemXmlApplicationContext(configFileUrls,
+ // context);
+ // }
+
+ // load application modules
+ String applicationConfigFileName = configBase + "app_conf.xml";
+ String[] configFileUrls = new String[] { "file:"
+ + applicationConfigFileName };
+ context = new FileSystemXmlApplicationContext(configFileUrls, context);
+ // attach any beans that implement ProcessingElement to the PE
+ // Container
+ String[] processingElementBeanNames = context
+ .getBeanNamesForType(AbstractPE.class);
+ for (String processingElementBeanName : processingElementBeanNames) {
+ Object bean = context.getBean(processingElementBeanName);
+ try {
+ Method getS4ClockMethod = bean.getClass().getMethod(
+ "getClock");
+
+ if (getS4ClockMethod.getReturnType().equals(Clock.class)) {
+ if (getS4ClockMethod.invoke(bean) == null) {
+ Method setS4ClockMethod = bean.getClass().getMethod(
+ "setClock", Clock.class);
+ setS4ClockMethod.invoke(bean,
+ coreContext.getBean("clock"));
+ }
+ }
+ ((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
+ } catch (NoSuchMethodException mnfe) {
+ // acceptable
+ }
+ System.out.println("Adding processing element with bean name "
+ + processingElementBeanName + ", id "
+ + ((AbstractPE) bean).getId());
+ peContainer.addProcessor((AbstractPE) bean);
+ }
+
+ appContext = context;
+ }
+
+
+
+ private void initConfigPaths(Class testClass, String s4CoreConfFileName)
+ throws IOException {
+ if (!configPathsInitialized) {
+ S4TestCase.initS4Parameters();
+ String testDir = testClass.getPackage().getName()
+ .replace('.', File.separatorChar);
+
+ ClassPathResource propResource = new ClassPathResource(
+ "s4_core.properties");
+ Properties prop = new Properties();
+ if (propResource.exists()) {
+ prop.load(propResource.getInputStream());
+ } else {
+ System.err
+ .println("Unable to find s4_core.properties. It must be available in classpath");
+ Thread.dumpStack();
+ System.exit(12);
+ }
+
+ configBase = System.getProperty("user.dir") + File.separator
+ + "src" + File.separator + "test" + File.separator + "java"
+ + File.separator + testDir + File.separator;
+ String configPath = configBase + File.separatorChar
+ + "wall_clock.xml";
+ List<String> coreConfigUrls = new ArrayList<String>();
+ coreConfigUrls.add(configPath);
+
+ // load core config xml
+ if (s4CoreConfFileName != null) {
+ // may be null for adapter
+ configPath = configBase + s4CoreConfFileName;
+ File configFile = new File(configPath);
+ if (!configFile.exists()) {
+ System.err.printf(
+ "S4 core config file %s does not exist\n",
+ configPath);
+ Thread.dumpStack();
+ System.exit(13);
+ }
+ coreConfigUrls.add(configPath);
+ }
+ String[] coreConfigFiles = new String[coreConfigUrls.size()];
+ coreConfigUrls.toArray(coreConfigFiles);
+
+ coreConfigFileUrls = new String[coreConfigFiles.length];
+ for (int i = 0; i < coreConfigFiles.length; i++) {
+ coreConfigFileUrls[i] = "file:" + coreConfigFiles[i];
+ }
+ configPathsInitialized = true;
+
+ }
+ }
+
+ public void destroy() {
+ ((FileSystemXmlApplicationContext)appContext).close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java b/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java
new file mode 100644
index 0000000..6095e4e
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/S4TestCase.java
@@ -0,0 +1,52 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.processor.AbstractPE;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Hashtable;
+import java.util.Map;
+
+import org.junit.BeforeClass;
+import org.springframework.context.ApplicationContext;
+
+public class S4TestCase {
+
+ String configType = "typical";
+ long seedTime = 0;
+ ApplicationContext appContext = null;
+ ApplicationContext adapterContext = null;
+ boolean configPathsInitialized = false;
+ public static File DEFAULT_TEST_OUTPUT_DIR = new File(
+ System.getProperty("user.dir") + File.separator + "tmp");
+ public static File DEFAULT_STORAGE_DIR = new File(
+ DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
+ + "storage");
+ // use a static map to track PE instances
+ public static final Map<Object, AbstractPE> registeredPEs = new Hashtable<Object, AbstractPE>();
+
+
+ @BeforeClass
+ public static void cleanLocks() {
+ TestUtils.cleanupTmpDirs();
+ }
+
+
+ @BeforeClass
+ public static void initS4Parameters() throws IOException {
+
+ System.setProperty("commlayer_mode", "static");
+ System.setProperty("commlayer.mode", "static");
+ System.setProperty("DequeueCount", "6");
+ System.setProperty("lock_dir", S4App.lockDirPath);
+ File lockDir = new File(S4App.lockDirPath);
+ if (!lockDir.exists()) {
+ if (!lockDir.mkdirs()) {
+ throw new RuntimeException("Cannot create directory: ["+lockDir.getAbsolutePath()+"]");
+ }
+ } else {
+ TestUtils.deleteDirectoryContents(lockDir);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java b/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java
new file mode 100644
index 0000000..c38d8d3
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/SimpleEventProducer.java
@@ -0,0 +1,54 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.collector.EventWrapper;
+import org.apache.s4.listener.EventHandler;
+import org.apache.s4.listener.EventProducer;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SimpleEventProducer implements EventProducer {
+
+ private Set<org.apache.s4.listener.EventHandler> handlers = new HashSet<org.apache.s4.listener.EventHandler>();
+ private String streamName;
+
+ LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+
+ public void init() {
+ }
+
+ @Override
+ public void addHandler(EventHandler handler) {
+ handlers.add(handler);
+
+ }
+
+ @Override
+ public boolean removeHandler(EventHandler handler) {
+ return handlers.remove(handler);
+ }
+
+ public void setStreamName(String streamName) {
+ this.streamName = streamName;
+ }
+
+ public String getStreamName() {
+ return streamName;
+ }
+
+ // TODO JSON-like stuff
+ public void produceEvent(String message) {
+ EventWrapper ew = new EventWrapper(streamName, message, null);
+ for (org.apache.s4.listener.EventHandler handler : handlers) {
+ try {
+ handler.processEvent(ew);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java b/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
new file mode 100644
index 0000000..cfb9132
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
@@ -0,0 +1,136 @@
+package org.apache.s4.ft;
+
+import org.apache.s4.processor.AbstractPE;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class StatefulTestPE extends AbstractPE implements Watcher {
+
+ String id;
+ String value1 = "";
+ String value2 = "";
+ transient ZooKeeper zk = null;
+ transient public static File DATA_FILE = new File(
+ System.getProperty("user.dir")
+ + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void processEvent(KeyValue event) {
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
+ S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
+ }
+ try {
+
+ if ("value1".equals(event.getKey())) {
+ setValue1(event.getValue());
+ zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else if ("value2".equals(event.getKey())) {
+ setValue2(event.getValue());
+ zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else if ("initiateCheckpoint".equals(event.getKey())) {
+ initiateCheckpoint();
+ } else {
+ throw new RuntimeException("unidentified event: " + event);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public String getValue1() {
+ return value1;
+ }
+
+ public void setValue1(String value1) {
+ this.value1 = value1;
+ persistValues();
+ }
+
+ public String getValue2() {
+ return value2;
+ }
+
+ public void setValue2(String value2) {
+ this.value2 = value2;
+ persistValues();
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ protected void checkpoint() {
+ super.checkpoint();
+ try {
+ zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ // NOTE: we use a file as a simple way to keep track of changes
+ private void persistValues() {
+
+ if (DATA_FILE.exists()) {
+ if (!DATA_FILE.delete()) {
+ throw new RuntimeException("Cannot delete datafile "
+ + DATA_FILE.getAbsolutePath());
+ }
+ }
+ try {
+ if (!DATA_FILE.createNewFile()) {
+ throw new RuntimeException("Cannot create datafile "
+ + DATA_FILE.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create datafile "
+ + DATA_FILE.getAbsolutePath());
+ }
+ try {
+ TestUtils.writeStringToFile("value1=" + value1 + " ; value2=" + value2,
+ DATA_FILE);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot write to datafile "
+ + DATA_FILE.getAbsolutePath());
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java b/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java
new file mode 100644
index 0000000..f3e620c
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestRedisStateStorage.java
@@ -0,0 +1,146 @@
+package org.apache.s4.ft;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.s4.ft.SafeKeeper.StorageResultCode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestRedisStateStorage {
+
+ private static Process redisDaemon;
+ private static final String PAYLOAD = "payload";
+ private RedisStateStorage storage;
+
+ @BeforeClass
+ public static void runRedis() throws IOException {
+ // String cmdline = "pwd";
+ List<String> cmdList = new ArrayList<String>();
+
+ cmdList.add(findCompiledRedisForPlatform());
+ ProcessBuilder pb = new ProcessBuilder(cmdList);
+ pb.directory(new File(System.getProperty("user.dir")));
+ pb.redirectErrorStream();
+ redisDaemon = pb.start();
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ redisDaemon.getInputStream()));
+ String s;
+ int maxLinesBeforeOK= 4;
+ for (int i = 0; i < maxLinesBeforeOK; i++) {
+ if ((s=br.readLine())!=null) {
+ if (s.contains("The server is now ready to accept connections on port 6379")) {
+ break;
+ }
+ } else {
+ break;
+ }
+ }
+
+ // redisDaemon = Runtime.getRuntime().exec(cmdline);
+ // BufferedReader br = new BufferedReader(new
+ // InputStreamReader(redisDaemon.getInputStream()));
+ // String s;
+ // while ((s = br.readLine()) != null)
+ // System.out.println(s);
+ }
+
+ private static String findCompiledRedisForPlatform() {
+ // TODO add compiled versions for other platforms/architectures
+ String osName = System.getProperty("os.name");
+ String osArch = System.getProperty("os.arch");
+ if (osName.equalsIgnoreCase("Mac OS X")) {
+ if (osArch.equalsIgnoreCase("x86_64")) {
+ return "src/test/resources/macosx/redis-server-64bits";
+ }
+ }
+ if (osName.equalsIgnoreCase("Linux")) {
+ if (osArch.equalsIgnoreCase("i386")) {
+ return "src/test/resources/linux/redis-server-32bits";
+ }
+ }
+ if (!new File(System.getProperty("user.dir")
+ + "/src/test/resources/redis-server").exists()) {
+ Assert.fail("Could not find a redis server executable for your platform. Please place an executable redis server version compiled for your platform in s4-core/src/test/resources, named 'redis-server'");
+ }
+ return "src/test/resources/redis-server";
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ storage = clearRedis();
+ }
+
+ public static RedisStateStorage clearRedis() {
+ RedisStateStorage storage = new RedisStateStorage();
+ storage.setRedisHost("localhost");
+ storage.setRedisPort(6379);
+ storage.init();
+ storage.clear();
+ return storage;
+ }
+
+ @Test
+ public void testFetchState() throws IOException, InterruptedException {
+ SafeKeeperId key = new SafeKeeperId("prototype", "key");
+ final CountDownLatch signalAllSaved = new CountDownLatch(1);
+ storage.saveState(key, PAYLOAD.getBytes(), new StorageCallback() {
+ @Override
+ public void storageOperationResult(StorageResultCode resultCode, Object message) {
+ signalAllSaved.countDown();
+ }
+ });
+ signalAllSaved.await(5, TimeUnit.SECONDS);
+ byte[] result = storage.fetchState(key);
+ String recovered = new String(result);
+ assertEquals(PAYLOAD, recovered);
+ }
+
+ @Test
+ public void testFetchStoredKeys() throws InterruptedException {
+ Set<SafeKeeperId> fixture = new HashSet<SafeKeeperId>();
+ for (int i = 0; i < 10; i++) {
+ fixture.add(new SafeKeeperId("prototype", "key" + i));
+ }
+ final CountDownLatch signalAllSaved = new CountDownLatch(10);
+ for (SafeKeeperId skid : fixture) {
+ storage.saveState(skid, PAYLOAD.getBytes(), new StorageCallback() {
+ @Override
+ public void storageOperationResult(StorageResultCode resultCode, Object message) {
+ signalAllSaved.countDown();
+ }
+ });
+ }
+
+ signalAllSaved.await(5, TimeUnit.SECONDS);
+ // retrieve the keys
+ Set<SafeKeeperId> result = storage.fetchStoredKeys();
+ assertEquals(fixture.size(), result.size());
+ assertEquals(fixture, result);
+ }
+
+ @AfterClass
+ public static void stopRedis() throws InterruptedException {
+ redisDaemon.destroy();
+ int exitcode = redisDaemon.waitFor();
+ if (exitcode != 0)
+ System.out.println("Redis exited with non zero exit code: "
+ + exitcode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
new file mode 100644
index 0000000..84af34f
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -0,0 +1,399 @@
+package org.apache.s4.ft;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+/**
+ * Contains static methods that can be used in tests for things such as:
+ * - files utilities: strings <-> files conversion, directory recursive delete etc...
+ * - starting local instances for zookeeper and bookkeeper
+ * - distributed latches through zookeeper
+ * - etc...
+ *
+ */
+public class TestUtils {
+
+ public static final int ZK_PORT = 21810;
+ public static Process forkS4App(String testClassName, String s4CoreConfFileName) throws IOException,
+ InterruptedException {
+ List<String> cmdList = new ArrayList<String>();
+ cmdList.add("java");
+ cmdList.add("-cp");
+ cmdList.add(System.getProperty("java.class.path"));
+ cmdList.add("-Dcommlayer_mode=static");
+ cmdList.add("-Dcommlayer.mode=static");
+ cmdList.add("-Dlock_dir=" + S4App.lockDirPath);
+ cmdList.add("-Dlog4j.configuration=file://"
+ + System.getProperty("user.dir")
+ + "/src/test/resources/log4j.xml");
+// cmdList.add("-Xdebug");
+// cmdList.add("-Xnoagent");
+// cmdList.add("-Xrunjdwp:transport=dt_socket,address=8788,server=y,suspend=n");
+ cmdList.add(S4App.class.getName());
+ cmdList.add(testClassName);
+ cmdList.add(s4CoreConfFileName);
+
+ ProcessBuilder pb = new ProcessBuilder(cmdList);
+ pb.directory(new File(System.getProperty("user.dir")));
+ pb.redirectErrorStream();
+ pb.toString();
+ final Process process = pb.start();
+ // TODO some synchro with s4 platform ready state
+ Thread.sleep(2500);
+
+ // if (start.exitValue() != 0) {
+ // System.out.println("here");
+ // }
+ new Thread(new Runnable() {
+ public void run() {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ process.getInputStream()));
+ String line;
+ try {
+ line = br.readLine();
+ while (line != null) {
+ System.out.println(line);
+ line = br.readLine();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+
+ return process;
+ }
+
+ public static void killS4App(Process forkedApp) throws IOException,
+ InterruptedException {
+ if (forkedApp != null) {
+ forkedApp.destroy();
+ }
+ }
+
+ public static void writeStringToFile(String s, File f) throws IOException {
+ if (f.exists()) {
+ if (!f.delete()) {
+ throw new RuntimeException("Cannot delete file "
+ + f.getAbsolutePath());
+ }
+ }
+
+ FileWriter fw = null;
+ try {
+ if (!f.createNewFile()) {
+ throw new RuntimeException("Cannot create new file : "
+ + f.getAbsolutePath());
+ }
+ fw = new FileWriter(f);
+
+ fw.write(s);
+ } catch (IOException e) {
+ throw (e);
+ } finally {
+ if (fw != null) {
+ try {
+ fw.close();
+ } catch (IOException e) {
+ throw (e);
+ }
+ }
+ }
+ }
+
+ public static String readFile(File f) throws IOException {
+ BufferedReader br = null;
+ try {
+ br = new BufferedReader(new FileReader(f));
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ if (line != null) {
+ sb.append("\n");
+ }
+ }
+ return sb.toString();
+ } finally {
+ if (br != null) {
+ try {
+ br.close();
+ } catch (IOException e) {
+ throw (e);
+ }
+ }
+ }
+
+ }
+
+ public static NIOServerCnxn.Factory startZookeeperServer()
+ throws IOException, InterruptedException, KeeperException {
+
+ List<String> cmdList = new ArrayList<String>();
+ final File zkDataDir = new File(System.getProperty("user.dir")
+ + File.separator + "tmp" + File.separator + "zookeeper"
+ + File.separator + "data");
+ if (zkDataDir.exists()) {
+ TestUtils.deleteDirectoryContents(zkDataDir);
+ } else {
+ zkDataDir.mkdirs();
+ }
+
+ ZooKeeperServer zks = new ZooKeeperServer(zkDataDir, zkDataDir, 3000);
+ // SyncRequestProcessor.setSnapCount(1000);
+ // final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
+ NIOServerCnxn.Factory nioZookeeperConnectionFactory = new NIOServerCnxn.Factory(
+ new InetSocketAddress(ZK_PORT));
+ nioZookeeperConnectionFactory.startup(zks);
+ Assert.assertTrue("waiting for server being up",
+ waitForServerUp("localhost", ZK_PORT, 4000));
+ return nioZookeeperConnectionFactory;
+
+ }
+
+ public static void stopZookeeperServer(NIOServerCnxn.Factory f)
+ throws IOException, InterruptedException {
+ if (f != null) {
+ f.shutdown();
+ Assert.assertTrue("waiting for server down",
+ waitForServerDown("localhost", ZK_PORT, 3000));
+ }
+
+
+
+ // List<String> cmdList = new ArrayList<String>();
+ // cmdList.add(System.getProperty("user.dir")
+ // + "/src/test/scripts/killJavaProcessForPort.sh");
+ // cmdList.add("*:21810");
+ // // int zkPid = Integer.valueOf(readFileAsString(new File(System
+ // // .getProperty("user.dir")
+ // // + File.separator
+ // // + "tmp"
+ // // + File.separator + "zk.pid")));
+ // // cmdList.add(String.valueOf(zkPid));
+ // ProcessBuilder pb = new ProcessBuilder(cmdList);
+ // // pb.directory(new File(System.getProperty("user.dir")));
+ // pb.start().waitFor();
+
+ }
+
+ public static void deleteDirectoryContents(File dir) {
+ File[] files = dir.listFiles();
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectoryContents(file);
+ }
+ if (!file.delete()) {
+ throw new RuntimeException("could not delete : " + file);
+ }
+ }
+ }
+
+ public static String readFileAsString(File f) throws IOException {
+ FileReader fr = new FileReader(f);
+ StringBuilder sb = new StringBuilder("");
+ BufferedReader br = new BufferedReader(fr);
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ if (line != null) {
+ sb.append("\n");
+ }
+ }
+ return sb.toString();
+
+ }
+
+ // TODO factor this code (see BasicFSStateStorage) - or use commons io or
+ // guava
+ public static byte[] readFileAsByteArray(File file) throws Exception {
+ FileInputStream in = null;
+ try {
+ in = new FileInputStream(file);
+
+ long length = file.length();
+
+ /*
+ * Arrays can only be created using int types, so ensure that the
+ * file size is not too big before we downcast to create the array.
+ */
+ if (length > Integer.MAX_VALUE) {
+ throw new IOException("Error file is too large: "
+ + file.getName() + " " + length + " bytes");
+ }
+
+ byte[] buffer = new byte[(int) length];
+ int offSet = 0;
+ int numRead = 0;
+
+ while (offSet < buffer.length
+ && (numRead = in.read(buffer, offSet, buffer.length
+ - offSet)) >= 0) {
+ offSet += numRead;
+ }
+
+ if (offSet < buffer.length) {
+ throw new IOException("Error, could not read entire file: "
+ + file.getName() + " " + offSet + "/" + buffer.length
+ + " bytes read");
+ }
+
+ in.close();
+ return buffer;
+
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ }
+
+ public static ZooKeeper createZkClient() throws IOException {
+ final ZooKeeper zk = new ZooKeeper("localhost:" + ZK_PORT, 4000,
+ new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ }
+ });
+ return zk;
+ }
+
+ public static void watchAndSignalCreation(String path,
+ final CountDownLatch latch, final ZooKeeper zk)
+ throws KeeperException, InterruptedException {
+
+ if (zk.exists(path, false) != null) {
+ zk.delete(path, -1);
+ }
+ zk.exists(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (EventType.NodeCreated.equals(event.getType())) {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ public static void watchAndSignalChangedChildren(String path,
+ final CountDownLatch latch, final ZooKeeper zk)
+ throws KeeperException, InterruptedException {
+
+ zk.getChildren(path, new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ if (EventType.NodeChildrenChanged.equals(event.getType())) {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ // from zookeeper's codebase
+ public static boolean waitForServerUp(String host, int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ // if there are multiple hostports, just take the first one
+ String result = send4LetterWord(host, port, "stat");
+ if (result.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } catch (IOException ignored) {
+ // ignore as this is expected
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ // from zookeeper's codebase
+ public static String send4LetterWord(String host, int port, String cmd)
+ throws IOException {
+ Socket sock = new Socket(host, port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write(cmd.getBytes());
+ outstream.flush();
+ // this replicates NC - close the output stream before reading
+ sock.shutdownOutput();
+
+ reader = new BufferedReader(new InputStreamReader(
+ sock.getInputStream()));
+ StringBuilder sb = new StringBuilder();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ sb.append(line + "\n");
+ }
+ return sb.toString();
+ } finally {
+ sock.close();
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ // from zookeeper's codebase
+ public static boolean waitForServerDown(String host, int port, long timeout) {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ send4LetterWord(host, port, "stat");
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ return false;
+ }
+
+ public static void cleanupTmpDirs() {
+ if (S4TestCase.DEFAULT_TEST_OUTPUT_DIR.exists()) {
+ deleteDirectoryContents(S4TestCase.DEFAULT_TEST_OUTPUT_DIR);
+ }
+ S4TestCase.DEFAULT_STORAGE_DIR.mkdirs();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/adapter.properties
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/adapter.properties b/s4-core/src/test/java/org/apache/s4/ft/adapter.properties
new file mode 100644
index 0000000..ae5169d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/adapter.properties
@@ -0,0 +1,2 @@
+appName=s4
+listenerAppName=s4
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml
new file mode 100644
index 0000000..f19c1e7
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/adapter_conf.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:adapter.properties</value>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true"/>
+ </bean>
+
+ <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter" init-method="init">
+ <property name="serDeser" ref="serDeser"/>
+ <property name="appName" value="${appName}"/>
+ <property name="listenerAppName" value="${listenerAppName}"/>
+ <property name="monitor" ref="monitor"/>
+ </bean>
+
+ <bean id="serDeser" class="io.s4.serialize.KryoSerDeser"/>
+
+ <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true" init-method="init">
+ <property name="flushInterval" value="30"/>
+ <property name="loggerName" value="monitor"/>
+ </bean>
+
+ <bean id="dummyPartitioner" class="io.s4.dispatcher.partitioner.DummyPartitioner"/>
+
+ <bean id="dispatcher" class="io.s4.dispatcher.Dispatcher" init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="dummyPartitioner"/>
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ <property name="loggerName" value="s4"/>
+ </bean>
+
+ <bean id="adapter" class="io.s4.adapter.Adapter"
+ init-method="init">
+ <property name="dispatcher" ref="dispatcher"/>
+ </bean>
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.out.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.out.xml b/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.out.xml
new file mode 100644
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml
new file mode 100644
index 0000000..abdf03e
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/app_adapter_conf.xml
@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans
+ http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+
+ <bean id="eventProducer" class="safekeeper.SimpleEventProducer"
+ init-method="init">
+ <property name="streamName" value="Default"/>
+ </bean>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml
new file mode 100644
index 0000000..3eec6ec
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/app_conf.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE">
+ <property name="id" value="printEventPE"/>
+ <property name="keys">
+ <list>
+ <value>TopicSeen topic</value>
+ </list>
+ </property>
+ </bean> -->
+
+ <bean id="statefulPE" class="io.s4.ft.StatefulTestPE">
+ <property name="id" value="statefulPE"/>
+ <property name="keys">
+ <list>
+ <value>Stream1 key</value>
+ </list>
+ </property>
+ <!-- we set the frequency to 1000 so that it checkpointing does NOT get triggered automatically! -->
+ <property name="checkpointingFrequencyByEventCount" value="1000" />
+ </bean>
+
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml
new file mode 100644
index 0000000..6b35e97
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_bk_backend.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="io.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="io.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="io.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="bkStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+ <bean id="bkStateStorage" class="io.s4.ft.BookKeeperStateStorage" init-method="init">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ <property name="zkServers" value="localhost:21810"/>
+ </bean>
+
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml
new file mode 100755
index 0000000..57d67d7
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="io.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="io.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="io.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="fsStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+ <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ </bean>
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml
new file mode 100644
index 0000000..e149ecc
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="clock" class="io.s4.util.clock.WallClock"/>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
new file mode 100644
index 0000000..37a68d1
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -0,0 +1,159 @@
+package org.apache.s4.ft.wordcount;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestRedisStateStorage;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.wordcount.WordCountTest;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * We use 2 lists of words that we inject in a word counting s4 system.
+ *
+ * After processing the first sentence, we just kill the platform and restart
+ * it.
+ *
+ * Then we inject the second sentence.
+ *
+ *
+ * We verify that no state was lost, i.e. that the words count includes words
+ * from both the first and the second sentence.
+ *
+ * NOTE 1: we synchronize through zookeeper to control when to kill the
+ * application, and when to verify assertions. NOTE 2: we use some additional
+ * explicit waits for bookkeeper backend so that it gets correctly initialized.
+ *
+ *
+ */
+public class FTWordCountTest extends S4TestCase {
+
+ private static Factory zookeeperServerConnectionFactory;
+ private static final String FILESYSTEM_BACKEND_CONF = "s4_core_conf_fs_backend.xml";
+ private static final String REDIS_BACKEND_CONF = "s4_core_conf_redis_backend.xml";
+ private Process forkedS4App = null;
+
+ @Test
+ public void testFileSystemBackend() throws Exception {
+ doTestCheckpointingAndRecovery(FILESYSTEM_BACKEND_CONF);
+ }
+
+ @Test
+ public void testRedisBackend() throws Exception {
+ TestRedisStateStorage.runRedis();
+ TestRedisStateStorage.clearRedis();
+ doTestCheckpointingAndRecovery(REDIS_BACKEND_CONF);
+ TestRedisStateStorage.stopRedis();
+ }
+
+ @Before
+ public void prepare() throws Exception {
+ TestUtils.cleanupTmpDirs();
+ S4TestCase.initS4Parameters();
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ if (forkedS4App != null) {
+ forkedS4App.destroy();
+ }
+ }
+
+ // we send 1 sentence, wait for all words to be processed, then crash the
+ // app
+ // we do that for 3 sentences, in order to make sure that recovery does not
+ // introduce side effects.
+ public void doTestCheckpointingAndRecovery(String backendConf)
+ throws Exception {
+ final ZooKeeper zk = TestUtils.createZkClient();
+
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+ zk);
+ EventGenerator gen = new EventGenerator();
+
+ // add authorizations for processing
+ for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ }
+ CountDownLatch signalSentence1Processed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/classifierIteration_"
+ + WordCountTest.SENTENCE_1_TOTAL_WORDS,
+ signalSentence1Processed, zk);
+ gen.injectValueEvent(
+ new KeyValue("sentence", WordCountTest.SENTENCE_1),
+ "Sentences", 0);
+ signalSentence1Processed.await(10, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+
+ // crash the app
+ forkedS4App.destroy();
+
+ // recovering and making sure checkpointing still works
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+
+ // add authorizations for continuing processing. Without these, the
+ // WordClassifier processed keeps waiting
+ for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS
+ + WordCountTest.SENTENCE_2_TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ }
+
+ CountDownLatch sentence2Processed = new CountDownLatch(1);
+ TestUtils
+ .watchAndSignalCreation(
+ "/classifierIteration_"
+ + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
+ sentence2Processed, zk);
+
+ gen.injectValueEvent(
+ new KeyValue("sentence", WordCountTest.SENTENCE_2),
+ "Sentences", 0);
+
+ sentence2Processed.await(10, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ // crash the app
+ forkedS4App.destroy();
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+
+ // add authorizations for continuing processing. Without these, the
+ // WordClassifier processed keeps waiting
+ for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS
+ + WordCountTest.SENTENCE_2_TOTAL_WORDS + 1; i <= WordCountTest.TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ }
+ gen.injectValueEvent(
+ new KeyValue("sentence", WordCountTest.SENTENCE_3),
+ "Sentences", 0);
+ signalTextProcessed.await(10, TimeUnit.SECONDS);
+ File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+ + File.separator + "wordcount");
+ String s = TestUtils.readFile(results);
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml b/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml
new file mode 100644
index 0000000..d1032c0
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/app_conf.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE"> <property
+ name="id" value="printEventPE"/> <property name="keys"> <list> <value>TopicSeen
+ topic</value> </list> </property> </bean> -->
+
+ <bean id="wordSplitter" class="io.s4.wordcount.WordSplitter">
+ <property name="id" value="wordSplitter" />
+ <property name="keys">
+ <list>
+ <value>Sentences *</value>
+ </list>
+ </property>
+ <property name="outputStreamName" value="Words" />
+ <property name="dispatcher" ref="wordDispatcher" />
+ </bean>
+
+ <bean id="wordCounter" class="io.s4.wordcount.WordCounter">
+ <property name="id" value="wordMapper" />
+ <property name="keys">
+ <list>
+ <value>Words word</value>
+ </list>
+ </property>
+ <property name="outputFrequencyByEventCount" value="1" />
+ <property name="outputStreamName" value="WordsCount" />
+ <property name="dispatcher" ref="wordsCountDispatcher" />
+ <property name="checkpointingFrequencyByEventCount" value="1" />
+ </bean>
+
+ <bean id="wordClassifier" class="io.s4.wordcount.WordClassifier">
+ <property name="id" value="wordClassifier" />
+ <property name="keys">
+ <list>
+ <!-- using a common agreed-upon routing key value -->
+ <value>WordsCount routingKey</value>
+ </list>
+ </property>
+ <property name="checkpointingFrequencyByEventCount" value="1" />
+ </bean>
+
+ <bean id="wordDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="wordPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="wordPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="streamNames">
+ <list>
+ <value>Words</value>
+ </list>
+ </property>
+ <property name="hashKey">
+ <list>
+ <value>word</value>
+ </list>
+ </property>
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="wordsCountDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="wordsCountPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="wordsCountPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="streamNames">
+ <list>
+ <value>WordsCount</value>
+ </list>
+ </property>
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+
+
+</beans>