You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 11:19:15 UTC
[27/50] [abbrv] Rename packages in preparation for move to Apache
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
new file mode 100755
index 0000000..1092868
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="io.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="io.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="io.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="fsStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+ <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ </bean>
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
new file mode 100755
index 0000000..3b6e251
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/s4_core_conf_redis_backend.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="io.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="io.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="io.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="redisStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="io.s4.ft.LoggingStorageCallbackFactory"/>
+
+
+ <bean id="redisStateStorage" class="io.s4.ft.RedisStateStorage" init-method="init">
+ <property name="redisHost" value="localhost"/>
+ <property name="redisPort" value="6379"/>
+ </bean>
+
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml
new file mode 100644
index 0000000..e149ecc
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/ft/wordcount/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="clock" class="io.s4.util.clock.WallClock"/>
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/processor/MockPE.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/processor/MockPE.java b/s4-core/src/test/java/org/apache/s4/processor/MockPE.java
new file mode 100644
index 0000000..10a38f9
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/processor/MockPE.java
@@ -0,0 +1,29 @@
+package org.apache.s4.processor;
+
+/**
+ * Mock PE for asserting correct behavior of AbstractPE
+ */
+public class MockPE extends AbstractPE {
+
+ private int initializeCount = 0;
+
+ public void initInstance() {
+ initializeCount++;
+ }
+
+ public void processEvent(Object obj) {
+ }
+
+ @Override
+ public void output() {
+
+ }
+
+ /**
+ * @return the initializeCount
+ */
+ public int getInitializeCount() {
+ return initializeCount;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
new file mode 100644
index 0000000..445972a
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
@@ -0,0 +1,31 @@
+package org.apache.s4.processor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import org.apache.s4.util.clock.WallClock;
+
+import org.junit.Test;
+
+public class TestPrototypeWrapper
+{
+
+ /**
+ * Verifies ability to set an initialize method that will be called when
+ * a new PE is instantiated
+ */
+ @Test
+ public void testCloneAndInitialize() {
+ MockPE prototype = new MockPE();
+
+ PrototypeWrapper prototypeWrapper = new PrototypeWrapper(prototype,
+ new WallClock());
+
+ assertEquals(0, prototype.getInitializeCount());
+ MockPE instance = (MockPE)prototypeWrapper.getPE("asd");
+ assertNotNull(instance);
+
+ assertEquals(0, prototype.getInitializeCount());
+ assertEquals(1, instance.getInitializeCount());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/Word.java b/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
new file mode 100644
index 0000000..c1fb3f1
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/Word.java
@@ -0,0 +1,22 @@
+package org.apache.s4.wordcount;
+
+public class Word {
+
+ private String word;
+
+ public Word() {
+ }
+
+ public Word(String word) {
+ this.word = word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+
+ public String getWord() {
+ return word;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
new file mode 100644
index 0000000..5b0558d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
@@ -0,0 +1,109 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.processor.AbstractPE;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
+import java.util.Set;
+import java.util.TreeMap;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class WordClassifier extends AbstractPE implements Watcher {
+
+ TreeMap<String, Integer> counts = new TreeMap();
+ private int counter;
+ transient private ZooKeeper zk;
+ private String id;
+ public final static String ROUTING_KEY = "classifier";
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ public void processEvent(WordCount wordCount) throws IOException,
+ Exception, InterruptedException {
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:21810", 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ System.out.println("seen: " + wordCount.getWord() + "/"
+ + wordCount.getCount());
+
+ if (!counts.containsKey(wordCount.getWord())
+ || (counts.containsKey(wordCount.getWord()) && counts.get(
+ wordCount.getWord()).compareTo(wordCount.getCount()) < 0)) {
+ // this is because wordCount events arrive unordered
+ counts.put(wordCount.getWord(), wordCount.getCount());
+ }
+ ++counter;
+ if (counter == WordCountTest.TOTAL_WORDS) {
+ File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+ + File.separator + "wordcount");
+ if (results.exists()) {
+ if (!results.delete()) {
+ throw new RuntimeException("cannot delete results file");
+ }
+ }
+ Set<Entry<String, Integer>> entrySet = counts.entrySet();
+ StringBuilder sb = new StringBuilder();
+ for (Entry<String, Integer> entry : entrySet) {
+ sb.append(entry.getKey() + "=" + entry.getValue() + ";");
+ }
+ TestUtils.writeStringToFile(sb.toString(), results);
+
+ zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else {
+ // NOTE: this will fail if we did not recover the latest counter,
+ // because there is already a counter with this number in zookeeper
+ zk.create("/classifierIteration_" + counter, new byte[counter],
+ Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ // check if we are allowed to continue
+ if (null == zk.exists("/continue_" + counter, null)) {
+ CountDownLatch latch = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/continue_" + counter, latch,
+ zk);
+ latch.await();
+ } else {
+ zk.delete("/continue_" + counter, -1);
+ System.out.println("");
+ }
+
+ }
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java
new file mode 100644
index 0000000..2ea5b09
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCount.java
@@ -0,0 +1,43 @@
+package org.apache.s4.wordcount;
+
+public class WordCount {
+
+ private String word;
+ private int count;
+ private String routingKey;
+
+ public WordCount() {
+ }
+
+ public WordCount(String word, int count, String routingKey) {
+ super();
+ this.word = word;
+ this.count = count;
+ this.routingKey = routingKey;
+ }
+
+ public String getWord() {
+ return word;
+ }
+
+ public void setWord(String word) {
+ this.word = word;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public String getRoutingKey() {
+ return routingKey;
+ }
+
+ public void setRoutingKey(String routingKey) {
+ this.routingKey = routingKey;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
new file mode 100644
index 0000000..5a511d7
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -0,0 +1,83 @@
+package org.apache.s4.wordcount;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4App;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class WordCountTest extends S4TestCase {
+ public static final String SENTENCE_1 = "to be or not to be doobie doobie da";
+ public static final int SENTENCE_1_TOTAL_WORDS = SENTENCE_1.split(" ").length;
+ public static final String SENTENCE_2 = "doobie doobie da";
+ public static final int SENTENCE_2_TOTAL_WORDS = SENTENCE_2.split(" ").length;
+ public static final String SENTENCE_3 = "doobie";
+ public static final int SENTENCE_3_TOTAL_WORDS = SENTENCE_3.split(" ").length;
+ public static final String FLAG = ";";
+ public static int TOTAL_WORDS = SENTENCE_1_TOTAL_WORDS
+ + SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
+ private static Factory zookeeperServerConnectionFactory;
+
+
+ @Before
+ public void prepare() throws IOException, InterruptedException, KeeperException {
+ TestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+
+ S4App app = new S4App(getClass(), "s4_core_conf.xml");
+ app.initializeS4App();
+ final ZooKeeper zk = TestUtils.createZkClient();
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
+ zk);
+ EventGenerator gen = new EventGenerator();
+
+ // add authorizations for processing
+ for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS
+ + 1; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+ }
+ gen.injectValueEvent(new KeyValue("sentence", SENTENCE_1),
+ "Sentences", 0);
+ gen.injectValueEvent(new KeyValue("sentence", SENTENCE_2), "Sentences",
+ 0);
+ gen.injectValueEvent(new KeyValue("sentence", SENTENCE_3), "Sentences",
+ 0);
+ signalTextProcessed.await();
+ File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
+ + File.separator + "wordcount");
+ String s = TestUtils.readFile(results);
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+ }
+
+ @After
+ public void cleanup() throws IOException, InterruptedException {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java
new file mode 100644
index 0000000..de7c291
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordCounter.java
@@ -0,0 +1,60 @@
+package org.apache.s4.wordcount;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.processor.AbstractPE;
+
+public class WordCounter extends AbstractPE {
+
+ private transient EventDispatcher dispatcher;
+ private String outputStreamName;
+ int wordCounter;
+ private String id;
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ public EventDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getOutputStreamName() {
+ return outputStreamName;
+ }
+
+ public void setOutputStreamName(String outputStreamName) {
+ this.outputStreamName = outputStreamName;
+ }
+
+ public void processEvent(Word word) {
+ wordCounter++;
+ // System.out.println("seen word " + word.getWord());
+ }
+
+ @Override
+ public void output() {
+ System.out.println("dispatch: " + getKeyValueString() + " / "
+ + wordCounter);
+ List<List<String>> compoundKeyNames = new ArrayList<List<String>>();
+ List<String> keyNames = new ArrayList<String>();
+ keyNames.add("routingKey");
+ compoundKeyNames.add(keyNames);
+ dispatcher.dispatchEvent(outputStreamName, compoundKeyNames,
+ new WordCount(getKeyValueString(), wordCounter,
+ WordClassifier.ROUTING_KEY));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java b/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java
new file mode 100644
index 0000000..7f9715d
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/WordSplitter.java
@@ -0,0 +1,61 @@
+package org.apache.s4.wordcount;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.s4.dispatcher.EventDispatcher;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.processor.AbstractPE;
+
+public class WordSplitter extends AbstractPE {
+
+ private transient EventDispatcher dispatcher;
+ private String outputStreamName;
+ private String id;
+
+ public EventDispatcher getDispatcher() {
+ return dispatcher;
+ }
+
+ public void setDispatcher(EventDispatcher dispatcher) {
+ this.dispatcher = dispatcher;
+ }
+
+ public String getOutputStreamName() {
+ return outputStreamName;
+ }
+
+ public void setOutputStreamName(String outputStreamName) {
+ this.outputStreamName = outputStreamName;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ public void processEvent(KeyValue keyValue) {
+ List<List<String>> compoundKeyNames = new ArrayList<List<String>>();
+ List<String> keyNames = new ArrayList<String>(1);
+ keyNames.add("word");
+ compoundKeyNames.add(keyNames);
+ if ("sentence".equals(keyValue.getKey())) {
+ String[] split = keyValue.getValue().split(" ");
+ for (int i = 0; i < split.length; i++) {
+ dispatcher.dispatchEvent(outputStreamName, compoundKeyNames,
+ new Word(split[i]));
+ }
+ }
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml b/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml
new file mode 100644
index 0000000..f03ea26
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/app_conf.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <!-- <bean id="printEventPE" class="io.s4.processor.PrintEventPE"> <property
+ name="id" value="printEventPE"/> <property name="keys"> <list> <value>TopicSeen
+ topic</value> </list> </property> </bean> -->
+
+ <bean id="wordSplitter" class="io.s4.wordcount.WordSplitter">
+ <property name="id" value="wordSplitter" />
+ <property name="keys">
+ <list>
+ <value>Sentences *</value>
+ </list>
+ </property>
+ <property name="outputStreamName" value="Words" />
+ <property name="dispatcher" ref="wordDispatcher" />
+ </bean>
+
+ <bean id="wordCounter" class="io.s4.wordcount.WordCounter">
+ <property name="id" value="wordCounter" />
+ <property name="keys">
+ <list>
+ <value>Words word</value>
+ </list>
+ </property>
+ <property name="outputFrequencyByEventCount" value="1" />
+ <property name="outputStreamName" value="WordsCount" />
+ <property name="dispatcher" ref="wordsCountDispatcher" />
+ </bean>
+
+ <bean id="wordClassifier" class="io.s4.wordcount.WordClassifier">
+ <property name="id" value="wordClassifier" />
+ <property name="keys">
+ <list>
+ <!-- use a predefined agreed-upon key -->
+ <value>WordsCount routingKey</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="wordDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="wordPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="wordPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="streamNames">
+ <list>
+ <value>Words</value>
+ </list>
+ </property>
+ <property name="hashKey">
+ <list>
+ <value>word</value>
+ </list>
+ </property>
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="wordsCountDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="wordsCountPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="wordsCountPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="streamNames">
+ <list>
+ <value>WordsCount</value>
+ </list>
+ </property>
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml b/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml
new file mode 100755
index 0000000..20c8bb1
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/s4_core_conf.xml
@@ -0,0 +1,194 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="io.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="io.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="io.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="io.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="io.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="io.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="io.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="io.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="io.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="io.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="io.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="io.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="io.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="io.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="io.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="io.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="io.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="fsStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ </bean>
+
+ <bean id="fsStateStorage" class="io.s4.ft.DefaultFileSystemStateStorage" init-method="checkStorageDir">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ </bean>
+
+
+
+</beans>
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/2f93667d/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml
----------------------------------------------------------------------
diff --git a/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml b/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml
new file mode 100644
index 0000000..e149ecc
--- /dev/null
+++ b/s4-core/src/test/java/org/apache/s4/wordcount/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="clock" class="io.s4.util.clock.WallClock"/>
+
+</beans>