You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC

[24/50] [abbrv] git commit: comm layer improvements

comm layer improvements

- added missing files

Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/c287c857
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/c287c857
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/c287c857

Branch: refs/heads/piper
Commit: c287c857edef43908f4846b05a8e1bdfd8e3a82e
Parents: d6450a4
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Nov 10 21:07:51 2011 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Nov 10 21:07:51 2011 +0100

----------------------------------------------------------------------
 .../test/s4/core/triggers/TriggeredModule.java     |    4 +-
 .../java/test/s4/fixtures/GenericTestModule.java   |   77 -----------
 .../ZkBasedClusterManagementTestModule.java        |   82 ++++++++++++
 .../test/s4/wordcount/zk/WordCountModuleZk.java    |    8 +
 .../java/test/s4/wordcount/zk/WordCountTestZk.java |  104 +++++++++++++++
 5 files changed, 196 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
index b8760c1..51c6c5f 100644
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
+++ b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
@@ -1,6 +1,6 @@
 package test.s4.core.triggers;
 
-import test.s4.fixtures.GenericTestModule;
+import test.s4.fixtures.FileBasedClusterManagementTestModule;
 
-public class TriggeredModule extends GenericTestModule<TriggeredApp> {
+public class TriggeredModule extends FileBasedClusterManagementTestModule<TriggeredApp> {
 }

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java
deleted file mode 100644
index 6a87d03..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package test.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class GenericTestModule<T> extends AbstractModule {
-
-    protected PropertiesConfiguration config = null;
-    private final Class<?> appClass;
-
-    protected GenericTestModule() {
-        // infer actual app class through "super type tokens" (this simple code
-        // assumes actual module class is a direct subclass from this one)
-        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
-        Type[] fieldArgTypes = pt.getActualTypeArguments();
-        this.appClass = (Class<?>) fieldArgTypes[0];
-    }
-
-    private void loadProperties(Binder binder) {
-
-        try {
-            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
-            config = new PropertiesConfiguration();
-            config.load(is);
-            config.setProperty("cluster.lock_dir",
-                    config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
-            System.out.println(ConfigurationUtils.toString(config));
-            // TODO - validate properties.
-
-            /* Make all properties injectable. Do we need this? */
-            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
-        } catch (ConfigurationException e) {
-            binder.addError(e);
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    protected void configure() {
-        if (config == null) {
-            loadProperties(binder());
-        }
-        bind(appClass);
-        bind(Cluster.class);
-        bind(Hasher.class).to(DefaultHasher.class);
-        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-        bind(Assignment.class).to(AssignmentFromFile.class);
-        bind(Topology.class).to(TopologyFromFile.class);
-        bind(Emitter.class).to(UDPEmitter.class);
-        bind(Listener.class).to(UDPListener.class);
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..27218a4
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -0,0 +1,82 @@
+package test.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.netty.NettyEmitter;
+import org.apache.s4.comm.netty.NettyListener;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+// also uses netty
+public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+
+    protected PropertiesConfiguration config = null;
+    private final Class<?> appClass;
+
+    protected ZkBasedClusterManagementTestModule() {
+        // infer actual app class through "super type tokens" (this simple code
+        // assumes actual module class is a direct subclass from this one)
+        ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+        Type[] fieldArgTypes = pt.getActualTypeArguments();
+        this.appClass = (Class<?>) fieldArgTypes[0];
+    }
+
+    private void loadProperties(Binder binder) {
+
+        try {
+            InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+            config = new PropertiesConfiguration();
+            config.load(is);
+            config.setProperty("cluster.zk_address",
+                    config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + TestUtils.ZK_PORT));
+            System.out.println(ConfigurationUtils.toString(config));
+            // TODO - validate properties.
+
+            /* Make all properties injectable. Do we need this? */
+            Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+        } catch (ConfigurationException e) {
+            binder.addError(e);
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    protected void configure() {
+        if (config == null) {
+            loadProperties(binder());
+        }
+        bind(appClass);
+        bind(Cluster.class);
+        bind(Hasher.class).to(DefaultHasher.class);
+        bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+        bind(Assignment.class).to(AssignmentFromZK.class);
+        bind(Topology.class).to(TopologyFromZK.class);
+        bind(Emitter.class).to(NettyEmitter.class);
+        bind(Listener.class).to(NettyListener.class);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java
new file mode 100644
index 0000000..8f87b8e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java
@@ -0,0 +1,8 @@
+package test.s4.wordcount.zk;
+
+import test.s4.fixtures.ZkBasedClusterManagementTestModule;
+import test.s4.wordcount.WordCountApp;
+
+public class WordCountModuleZk extends ZkBasedClusterManagementTestModule<WordCountApp> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java
new file mode 100644
index 0000000..23db369
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java
@@ -0,0 +1,104 @@
+package test.s4.wordcount.zk;
+
+import static org.junit.Assert.*;
+import static test.s4.wordcount.WordCountTest.*;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.core.App;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import test.s4.fixtures.TestUtils;
+import test.s4.wordcount.WordCountApp;
+import test.s4.wordcount.WordCountModule;
+
+public class WordCountTestZk {
+
+    private ZkServer zkServer;
+    private ZkClient zkClient;
+
+    @Before
+    public void prepare() {
+
+        String dataDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+        String logDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+        TestUtils.cleanupTmpDirs();
+
+        IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+            @Override
+            public void createDefaultNameSpace(ZkClient zkClient) {
+
+            }
+        };
+
+        zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, TestUtils.ZK_PORT);
+        zkServer.start();
+
+        // zkClient = zkServer.getZkClient();
+        String zookeeperAddress = "localhost:" + TestUtils.ZK_PORT;
+        zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
+
+        ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
+        zkClient2.getCreationTime("/");
+        TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+        final String clusterName = "s4-test-cluster";
+        taskSetup.clean(clusterName);
+        taskSetup.setup(clusterName, 1);
+        // final CountDownLatch latch = new CountDownLatch(10);
+        // for (int i = 0; i < 10; i++) {
+        // Runnable runnable = new Runnable() {
+        //
+        // @Override
+        // public void run() {
+        // AssignmentFromZK assignmentFromZK;
+        // try {
+        // assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+        // ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+        // latch.countDown();
+        // } catch (Exception e) {
+        // e.printStackTrace();
+        // }
+        // }
+        // };
+        // Thread t = new Thread(runnable);
+        // t.start();
+        // }
+    }
+
+    @Test
+    public void test() throws Exception {
+
+        final ZooKeeper zk = TestUtils.createZkClient();
+
+        App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
+
+        CountDownLatch signalTextProcessed = new CountDownLatch(1);
+        TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
+        // add authorizations for processing
+        for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+            zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+        TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+        signalTextProcessed.await();
+        File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+        String s = TestUtils.readFile(results);
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+    }
+}