You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/23 16:16:14 UTC
[1/2] git commit: Merge branch 'S4-81' into piper
Updated Branches:
refs/heads/piper 54c5fa232 -> e97205c8e
Merge branch 'S4-81' into piper
Conflicts:
subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/e97205c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/e97205c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/e97205c8
Branch: refs/heads/piper
Commit: e97205c8e8710553609d54eb4b728f15dad10eca
Parents: 54c5fa2 9b0f811
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Jul 23 18:11:01 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Jul 23 18:11:01 2012 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/s4/core/Main.java | 7 ++++---
.../org/apache/s4/core/ft/FTWordCountTest.java | 2 +-
.../main/java/org/apache/s4/tools/ZKServer.java | 8 ++++----
3 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e97205c8/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index e48d586,0000000..8ab093f
mode 100644,000000..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@@ -1,131 -1,0 +1,131 @@@
+package org.apache.s4.core.ft;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.base.Event;
+import org.apache.s4.base.EventMessage;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.comm.tcp.TCPEmitter;
+import org.apache.s4.core.DefaultCoreModule;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.CoreTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.apache.s4.wordcount.WordCountTest;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Test;
+
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class FTWordCountTest extends ZkBasedTest {
+
+ private Process forkedS4App;
+
+ @After
+ public void clean() throws IOException, InterruptedException {
+ CoreTestUtils.killS4App(forkedS4App);
+ }
+
+ @Test
+ public void testCheckpointAndRecovery() throws Exception {
+
+ Injector injector = Guice.createInjector(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+
+ TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
+
+ final ZooKeeper zk = CoreTestUtils.createZkClient();
+
+ restartNode();
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
+ // add authorizations for processing
+ for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+ CountDownLatch signalSentence1Processed = new CountDownLatch(1);
+ CoreTestUtils.watchAndSignalCreation("/classifierIteration_" + WordCountTest.SENTENCE_1_TOTAL_WORDS,
+ signalSentence1Processed, zk);
+
+ injectSentence(injector, emitter, WordCountTest.SENTENCE_1);
+ signalSentence1Processed.await(10, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ // crash the app
+ forkedS4App.destroy();
+
+ restartNode();
+ // add authorizations for continuing processing. Without these, the
+ // WordClassifier processed keeps waiting
+ for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS
+ + WordCountTest.SENTENCE_2_TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+ CountDownLatch sentence2Processed = new CountDownLatch(1);
+ CoreTestUtils
+ .watchAndSignalCreation("/classifierIteration_"
+ + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
+ sentence2Processed, zk);
+
+ injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
+
+ sentence2Processed.await(10, TimeUnit.SECONDS);
+ Thread.sleep(1000);
+
+ // crash the app
+ forkedS4App.destroy();
+ restartNode();
+
+ // add authorizations for continuing processing. Without these, the
+ // WordClassifier processed keeps waiting
+ for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS + 1; i <= WordCountTest.TOTAL_WORDS; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
+ signalTextProcessed.await(10, TimeUnit.SECONDS);
+ File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ if (!results.exists()) {
+ // in case the results file isn't ready yet
+ Thread.sleep(1000);
+ results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ }
+ String s = CoreTestUtils.readFile(results);
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+ }
+
+ private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) {
+ Event event;
+ event = new Event();
+ event.put("sentence", String.class, sentence);
+ emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
+ .serialize(event)));
+ }
+
+ private void restartNode() throws IOException, InterruptedException {
+ CountDownLatch signalConsumerReady = RecoveryTest.getConsumerReadySignal("inputStream");
+
+ // recovering and making sure checkpointing still works
+ forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
+ FTWordCountApp.class.getName(), "-p",
- "{s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR + "}",
++ "s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR,
+ "-extraModulesClasses", FileSystemBackendCheckpointingModule.class.getName() });
+ Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
+ }
+
+}