You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/07/23 16:16:14 UTC

[1/2] git commit: Merge branch 'S4-81' into piper

Updated Branches:
  refs/heads/piper 54c5fa232 -> e97205c8e


Merge branch 'S4-81' into piper

Conflicts:
	subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/e97205c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/e97205c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/e97205c8

Branch: refs/heads/piper
Commit: e97205c8e8710553609d54eb4b728f15dad10eca
Parents: 54c5fa2 9b0f811
Author: Matthieu Morel <mm...@apache.org>
Authored: Mon Jul 23 18:11:01 2012 +0200
Committer: Matthieu Morel <mm...@apache.org>
Committed: Mon Jul 23 18:11:01 2012 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/s4/core/Main.java     |    7 ++++---
 .../org/apache/s4/core/ft/FTWordCountTest.java     |    2 +-
 .../main/java/org/apache/s4/tools/ZKServer.java    |    8 ++++----
 3 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/e97205c8/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
----------------------------------------------------------------------
diff --cc subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index e48d586,0000000..8ab093f
mode 100644,000000..100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@@ -1,131 -1,0 +1,131 @@@
 +package org.apache.s4.core.ft;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +
 +import junit.framework.Assert;
 +
 +import org.apache.s4.base.Event;
 +import org.apache.s4.base.EventMessage;
 +import org.apache.s4.base.SerializerDeserializer;
 +import org.apache.s4.comm.DefaultCommModule;
 +import org.apache.s4.comm.tcp.TCPEmitter;
 +import org.apache.s4.core.DefaultCoreModule;
 +import org.apache.s4.fixtures.CommTestUtils;
 +import org.apache.s4.fixtures.CoreTestUtils;
 +import org.apache.s4.fixtures.ZkBasedTest;
 +import org.apache.s4.wordcount.WordCountTest;
 +import org.apache.zookeeper.CreateMode;
 +import org.apache.zookeeper.ZooDefs.Ids;
 +import org.apache.zookeeper.ZooKeeper;
 +import org.junit.After;
 +import org.junit.Test;
 +
 +import com.google.common.io.Resources;
 +import com.google.inject.Guice;
 +import com.google.inject.Injector;
 +
 +public class FTWordCountTest extends ZkBasedTest {
 +
 +    private Process forkedS4App;
 +
 +    @After
 +    public void clean() throws IOException, InterruptedException {
 +        CoreTestUtils.killS4App(forkedS4App);
 +    }
 +
 +    @Test
 +    public void testCheckpointAndRecovery() throws Exception {
 +
 +        Injector injector = Guice.createInjector(
 +                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
 +                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
 +
 +        TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
 +
 +        final ZooKeeper zk = CoreTestUtils.createZkClient();
 +
 +        restartNode();
 +
 +        CountDownLatch signalTextProcessed = new CountDownLatch(1);
 +        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
 +
 +        // add authorizations for processing
 +        for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
 +            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 +        }
 +
 +        CountDownLatch signalSentence1Processed = new CountDownLatch(1);
 +        CoreTestUtils.watchAndSignalCreation("/classifierIteration_" + WordCountTest.SENTENCE_1_TOTAL_WORDS,
 +                signalSentence1Processed, zk);
 +
 +        injectSentence(injector, emitter, WordCountTest.SENTENCE_1);
 +        signalSentence1Processed.await(10, TimeUnit.SECONDS);
 +        Thread.sleep(1000);
 +
 +        // crash the app
 +        forkedS4App.destroy();
 +
 +        restartNode();
 +        // add authorizations for continuing processing. Without these, the
 +        // WordClassifier processed keeps waiting
 +        for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS
 +                + WordCountTest.SENTENCE_2_TOTAL_WORDS; i++) {
 +            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 +        }
 +
 +        CountDownLatch sentence2Processed = new CountDownLatch(1);
 +        CoreTestUtils
 +                .watchAndSignalCreation("/classifierIteration_"
 +                        + (WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS),
 +                        sentence2Processed, zk);
 +
 +        injectSentence(injector, emitter, WordCountTest.SENTENCE_2);
 +
 +        sentence2Processed.await(10, TimeUnit.SECONDS);
 +        Thread.sleep(1000);
 +
 +        // crash the app
 +        forkedS4App.destroy();
 +        restartNode();
 +
 +        // add authorizations for continuing processing. Without these, the
 +        // WordClassifier processed keeps waiting
 +        for (int i = WordCountTest.SENTENCE_1_TOTAL_WORDS + WordCountTest.SENTENCE_2_TOTAL_WORDS + 1; i <= WordCountTest.TOTAL_WORDS; i++) {
 +            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
 +        }
 +        injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
 +        signalTextProcessed.await(10, TimeUnit.SECONDS);
 +        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
 +        if (!results.exists()) {
 +            // in case the results file isn't ready yet
 +            Thread.sleep(1000);
 +            results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
 +        }
 +        String s = CoreTestUtils.readFile(results);
 +        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
 +
 +    }
 +
 +    private void injectSentence(Injector injector, TCPEmitter emitter, String sentence) {
 +        Event event;
 +        event = new Event();
 +        event.put("sentence", String.class, sentence);
 +        emitter.send(0, new EventMessage("-1", "inputStream", injector.getInstance(SerializerDeserializer.class)
 +                .serialize(event)));
 +    }
 +
 +    private void restartNode() throws IOException, InterruptedException {
 +        CountDownLatch signalConsumerReady = RecoveryTest.getConsumerReadySignal("inputStream");
 +
 +        // recovering and making sure checkpointing still works
 +        forkedS4App = CoreTestUtils.forkS4Node(new String[] { "-c", "cluster1", "-appClass",
 +                FTWordCountApp.class.getName(), "-p",
-                 "{s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR + "}",
++                "s4.checkpointing.filesystem.storageRootPath=" + CommTestUtils.DEFAULT_STORAGE_DIR,
 +                "-extraModulesClasses", FileSystemBackendCheckpointingModule.class.getName() });
 +        Assert.assertTrue(signalConsumerReady.await(20, TimeUnit.SECONDS));
 +    }
 +
 +}