You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:28 UTC
[24/50] [abbrv] git commit: comm layer improvements
comm layer improvements
- added missing files
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/c287c857
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/c287c857
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/c287c857
Branch: refs/heads/piper
Commit: c287c857edef43908f4846b05a8e1bdfd8e3a82e
Parents: d6450a4
Author: Matthieu Morel <mm...@apache.org>
Authored: Thu Nov 10 21:07:51 2011 +0100
Committer: Matthieu Morel <mm...@apache.org>
Committed: Thu Nov 10 21:07:51 2011 +0100
----------------------------------------------------------------------
.../test/s4/core/triggers/TriggeredModule.java | 4 +-
.../java/test/s4/fixtures/GenericTestModule.java | 77 -----------
.../ZkBasedClusterManagementTestModule.java | 82 ++++++++++++
.../test/s4/wordcount/zk/WordCountModuleZk.java | 8 +
.../java/test/s4/wordcount/zk/WordCountTestZk.java | 104 +++++++++++++++
5 files changed, 196 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
index b8760c1..51c6c5f 100644
--- a/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
+++ b/subprojects/s4-core/src/test/java/test/s4/core/triggers/TriggeredModule.java
@@ -1,6 +1,6 @@
package test.s4.core.triggers;
-import test.s4.fixtures.GenericTestModule;
+import test.s4.fixtures.FileBasedClusterManagementTestModule;
-public class TriggeredModule extends GenericTestModule<TriggeredApp> {
+public class TriggeredModule extends FileBasedClusterManagementTestModule<TriggeredApp> {
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java
deleted file mode 100644
index 6a87d03..0000000
--- a/subprojects/s4-core/src/test/java/test/s4/fixtures/GenericTestModule.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package test.s4.fixtures;
-
-import java.io.InputStream;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-public abstract class GenericTestModule<T> extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
- private final Class<?> appClass;
-
- protected GenericTestModule() {
- // infer actual app class through "super type tokens" (this simple code
- // assumes actual module class is a direct subclass from this one)
- ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
- Type[] fieldArgTypes = pt.getActualTypeArguments();
- this.appClass = (Class<?>) fieldArgTypes[0];
- }
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
- config = new PropertiesConfiguration();
- config.load(is);
- config.setProperty("cluster.lock_dir",
- config.getString("cluster.lock_dir").replace("{user.dir}", System.getProperty("java.io.tmpdir")));
- System.out.println(ConfigurationUtils.toString(config));
- // TODO - validate properties.
-
- /* Make all properties injectable. Do we need this? */
- Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null) {
- loadProperties(binder());
- }
- bind(appClass);
- bind(Cluster.class);
- bind(Hasher.class).to(DefaultHasher.class);
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
- bind(Assignment.class).to(AssignmentFromFile.class);
- bind(Topology.class).to(TopologyFromFile.class);
- bind(Emitter.class).to(UDPEmitter.class);
- bind(Listener.class).to(UDPListener.class);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java b/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
new file mode 100644
index 0000000..27218a4
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/fixtures/ZkBasedClusterManagementTestModule.java
@@ -0,0 +1,82 @@
+package test.s4.fixtures;
+
+import java.io.InputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.DefaultHasher;
+import org.apache.s4.comm.netty.NettyEmitter;
+import org.apache.s4.comm.netty.NettyListener;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.topology.TopologyFromZK;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.name.Names;
+
+// also uses netty
+public class ZkBasedClusterManagementTestModule<T> extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+ private final Class<?> appClass;
+
+ protected ZkBasedClusterManagementTestModule() {
+ // infer actual app class through "super type tokens" (this simple code
+ // assumes actual module class is a direct subclass from this one)
+ ParameterizedType pt = (ParameterizedType) getClass().getGenericSuperclass();
+ Type[] fieldArgTypes = pt.getActualTypeArguments();
+ this.appClass = (Class<?>) fieldArgTypes[0];
+ }
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream("/default.s4.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+ config.setProperty("cluster.zk_address",
+ config.getString("cluster.zk_address").replaceFirst("\\w+:\\d+", "localhost:" + TestUtils.ZK_PORT));
+ System.out.println(ConfigurationUtils.toString(config));
+ // TODO - validate properties.
+
+ /* Make all properties injectable. Do we need this? */
+ Names.bindProperties(binder, ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null) {
+ loadProperties(binder());
+ }
+ bind(appClass);
+ bind(Cluster.class);
+ bind(Hasher.class).to(DefaultHasher.class);
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+ bind(Assignment.class).to(AssignmentFromZK.class);
+ bind(Topology.class).to(TopologyFromZK.class);
+ bind(Emitter.class).to(NettyEmitter.class);
+ bind(Listener.class).to(NettyListener.class);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java
new file mode 100644
index 0000000..8f87b8e
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountModuleZk.java
@@ -0,0 +1,8 @@
+package test.s4.wordcount.zk;
+
+import test.s4.fixtures.ZkBasedClusterManagementTestModule;
+import test.s4.wordcount.WordCountApp;
+
+public class WordCountModuleZk extends ZkBasedClusterManagementTestModule<WordCountApp> {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/c287c857/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java
new file mode 100644
index 0000000..23db369
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/test/s4/wordcount/zk/WordCountTestZk.java
@@ -0,0 +1,104 @@
+package test.s4.wordcount.zk;
+
+import static org.junit.Assert.*;
+import static test.s4.wordcount.WordCountTest.*;
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ClusterNode;
+import org.apache.s4.core.App;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.junit.Before;
+import org.junit.Test;
+
+import test.s4.fixtures.TestUtils;
+import test.s4.wordcount.WordCountApp;
+import test.s4.wordcount.WordCountModule;
+
+public class WordCountTestZk {
+
+ private ZkServer zkServer;
+ private ZkClient zkClient;
+
+ @Before
+ public void prepare() {
+
+ String dataDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "data";
+ String logDir = TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "zookeeper" + File.separator + "logs";
+ TestUtils.cleanupTmpDirs();
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+
+ zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, TestUtils.ZK_PORT);
+ zkServer.start();
+
+ // zkClient = zkServer.getZkClient();
+ String zookeeperAddress = "localhost:" + TestUtils.ZK_PORT;
+ zkClient = new ZkClient(zookeeperAddress, 10000, 10000);
+
+ ZkClient zkClient2 = new ZkClient(zookeeperAddress, 10000, 10000);
+ zkClient2.getCreationTime("/");
+ TaskSetup taskSetup = new TaskSetup(zookeeperAddress);
+ final String clusterName = "s4-test-cluster";
+ taskSetup.clean(clusterName);
+ taskSetup.setup(clusterName, 1);
+ // final CountDownLatch latch = new CountDownLatch(10);
+ // for (int i = 0; i < 10; i++) {
+ // Runnable runnable = new Runnable() {
+ //
+ // @Override
+ // public void run() {
+ // AssignmentFromZK assignmentFromZK;
+ // try {
+ // assignmentFromZK = new AssignmentFromZK(clusterName, zookeeperAddress, 30000, 30000);
+ // ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
+ // latch.countDown();
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ // }
+ // };
+ // Thread t = new Thread(runnable);
+ // t.start();
+ // }
+ }
+
+ @Test
+ public void test() throws Exception {
+
+ final ZooKeeper zk = TestUtils.createZkClient();
+
+ App.main(new String[] { WordCountModuleZk.class.getName(), WordCountApp.class.getName() });
+
+ CountDownLatch signalTextProcessed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+
+ // add authorizations for processing
+ for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
+ zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+ TestUtils.injectIntoStringSocketAdapter(SENTENCE_1);
+ TestUtils.injectIntoStringSocketAdapter(SENTENCE_2);
+ TestUtils.injectIntoStringSocketAdapter(SENTENCE_3);
+ signalTextProcessed.await();
+ File results = new File(TestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
+ String s = TestUtils.readFile(results);
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+ }
+}