You are viewing a plain text version of this content. The canonical link for it is here.
Posted to s4-commits@incubator.apache.org by mm...@apache.org on 2012/01/03 14:03:29 UTC
[41/50] [abbrv] git commit: Working delivery tests for UDP and Netty
Working delivery tests for UDP and Netty
Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/0d111970
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/0d111970
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/0d111970
Branch: refs/heads/piper
Commit: 0d111970536ba50084f5c30899535ac8a6e27ffd
Parents: 92f3252
Author: Karthik Kambatla <kk...@cs.purdue.edu>
Authored: Sat Oct 22 23:50:07 2011 -0400
Committer: Karthik Kambatla <kk...@cs.purdue.edu>
Committed: Sat Oct 22 23:51:25 2011 -0400
----------------------------------------------------------------------
.../test/java/org/apache/s4/comm/NettyTest.java | 88 +++++++++++++
.../java/org/apache/s4/comm/NettyTestModule.java | 96 ---------------
.../org/apache/s4/comm/SimpleDeliveryTest.java | 67 +++--------
.../src/test/java/org/apache/s4/comm/UDPTest.java | 89 +++++++++++++
.../java/org/apache/s4/comm/UDPTestModule.java | 96 ---------------
5 files changed, 192 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java
new file mode 100644
index 0000000..4db680d
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTest.java
@@ -0,0 +1,88 @@
+package org.apache.s4.comm;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.netty.NettyEmitter;
+import org.apache.s4.comm.netty.NettyListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class NettyTest extends SimpleDeliveryTest {
+
+ @Override
+ protected void setUp() {
+ Injector injector = Guice.createInjector(new NettyTestModule());
+ sdt = injector.getInstance(CommWrapper.class);
+ }
+
+ class NettyTestModule extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream(
+ "/s4-comm-test.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ Names.bindProperties(binder,
+ ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
+
+ int numHosts = config.getList("cluster.hosts").size();
+ boolean isCluster = numHosts > 1 ? true : false;
+ bind(Boolean.class).annotatedWith(Names.named("isCluster"))
+ .toInstance(Boolean.valueOf(isCluster));
+
+ bind(Cluster.class);
+
+ bind(Assignment.class).to(AssignmentFromFile.class);
+
+ bind(Topology.class).to(TopologyFromFile.class);
+
+ /* Use a simple UDP comm layer implementation. */
+ bind(Listener.class).to(NettyListener.class);
+ bind(Emitter.class).to(NettyEmitter.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ bind(Integer.class).annotatedWith(
+ Names.named("emitter.send.interval")).toInstance(
+ config.getInt("emitter.send.interval"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
deleted file mode 100644
index 65c3dee..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/NettyTestModule.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.netty.NettyEmitter;
-import org.apache.s4.comm.netty.NettyListener;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/*
- * Module for s4-comm/tests
- */
-public class NettyTestModule extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream(
- "/s4-comm-test.properties");
- config = new PropertiesConfiguration();
- config.load(is);
-
- System.out.println(ConfigurationUtils.toString(config));
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
- Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- bind(Assignment.class).to(AssignmentFromFile.class);
-
- bind(Topology.class).to(TopologyFromFile.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(NettyListener.class);
- bind(Emitter.class).to(NettyEmitter.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
- .toInstance(config.getInt("emitter.send.interval"));
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
index ba85899..17a2376 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/SimpleDeliveryTest.java
@@ -2,18 +2,18 @@ package org.apache.s4.comm;
import org.apache.s4.base.Emitter;
import org.apache.s4.base.Listener;
-
-import com.google.inject.Guice;
import com.google.inject.Inject;
-import com.google.inject.Injector;
import com.google.inject.name.Named;
import junit.framework.Assert;
import junit.framework.TestCase;
/*
- * Test class to test communication protocols.
+ * Test class to test communication protocols. As comm-layer connections need to be
+ * made including acquiring locks, the test is declared abstract and needs to be
+ * extended with appropriate protocols.
*
+ * At a high-level, the test accomplishes the following:
* <ul>
* <li> Create Send and Receive Threads </li>
* <li> SendThread sends out a pre-defined number of messages to all the partitions </li>
@@ -24,9 +24,10 @@ import junit.framework.TestCase;
* </ul>
*
*/
-public class SimpleDeliveryTest extends TestCase {
+public abstract class SimpleDeliveryTest extends TestCase {
+ protected CommWrapper sdt;
- class CommWrapper {
+ static class CommWrapper {
final private static int messageCount = 200;
final private static int timerThreadCount = 100;
@@ -140,54 +141,14 @@ public class SimpleDeliveryTest extends TestCase {
}
/**
- * test1() tests the UDP protocol. If all components function without
- * throwing exceptions, the test passes. As UDP doesn't guarantee message
- * delivery, the number of messages received doesn't come into play to
- * determine if it passes the test.
- *
+ * test() tests the protocol. If all components function without throwing
+ * exceptions, the test passes. The test also reports the loss of messages,
+ * if any.
*
* @throws InterruptedException
*/
- public void test1() throws InterruptedException {
- System.out.println("Testing UDP");
-
- Injector injector = Guice.createInjector(new UDPTestModule());
+ public void test() throws InterruptedException {
try {
- CommWrapper sdt = injector.getInstance(CommWrapper.class);
-
- // start send and receive threads
- sdt.sendThread.start();
- sdt.receiveThread.start();
-
- // wait for them to finish
- sdt.sendThread.join();
- sdt.receiveThread.join();
-
- // exit - system.exit is called here to revoke the lock file and
- // listener
- // sockets
- } catch (Exception e) {
- Assert.fail("UDP test has failed");
- }
- Assert.assertTrue("UDP test PASSED. Seems to work fine", true);
-
- System.out.println("Done");
- }
-
- /**
- * test2() tests the Netty TCP protocol. If all components function without
- * throwing exceptions, the test passes partially. As TCP guarantees message
- * delivery, the test checks for that too.
- *
- * @throws InterruptedException
- */
- public void test2() throws InterruptedException {
- System.out.println("Testing Netty TCP");
-
- Injector injector = Guice.createInjector(new NettyTestModule());
- try {
- CommWrapper sdt = injector.getInstance(CommWrapper.class);
-
// start send and receive threads
sdt.sendThread.start();
sdt.receiveThread.start();
@@ -199,10 +160,12 @@ public class SimpleDeliveryTest extends TestCase {
Assert.assertTrue("Guaranteed message delivery",
!sdt.moreMessages());
} catch (Exception e) {
- Assert.fail("Netty test has failed basic functionality test");
+ e.printStackTrace();
+ Assert.fail("The comm protocol has failed basic functionality test");
}
- Assert.assertTrue("Netty seems to be working crash-free", true);
+ Assert.assertTrue("The comm protocol seems to be working crash-free",
+ true);
System.out.println("Done");
}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java
new file mode 100644
index 0000000..720dc28
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTest.java
@@ -0,0 +1,89 @@
+package org.apache.s4.comm;
+
+import java.io.InputStream;
+
+import org.apache.commons.configuration.ConfigurationConverter;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.s4.base.Emitter;
+import org.apache.s4.base.Hasher;
+import org.apache.s4.base.Listener;
+import org.apache.s4.base.SerializerDeserializer;
+import org.apache.s4.comm.serialize.KryoSerDeser;
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.AssignmentFromFile;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Topology;
+import org.apache.s4.comm.topology.TopologyFromFile;
+import org.apache.s4.comm.udp.UDPEmitter;
+import org.apache.s4.comm.udp.UDPListener;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.name.Names;
+
+public class UDPTest extends SimpleDeliveryTest {
+
+ @Override
+ protected void setUp() {
+ Injector injector = Guice.createInjector(new UDPTestModule());
+ sdt = injector.getInstance(CommWrapper.class);
+ }
+
+ class UDPTestModule extends AbstractModule {
+
+ protected PropertiesConfiguration config = null;
+
+ private void loadProperties(Binder binder) {
+
+ try {
+ InputStream is = this.getClass().getResourceAsStream(
+ "/s4-comm-test.properties");
+ config = new PropertiesConfiguration();
+ config.load(is);
+
+ System.out.println(ConfigurationUtils.toString(config));
+ Names.bindProperties(binder,
+ ConfigurationConverter.getProperties(config));
+ } catch (ConfigurationException e) {
+ binder.addError(e);
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ protected void configure() {
+ if (config == null)
+ loadProperties(binder());
+
+ int numHosts = config.getList("cluster.hosts").size();
+ boolean isCluster = numHosts > 1 ? true : false;
+ bind(Boolean.class).annotatedWith(Names.named("isCluster"))
+ .toInstance(Boolean.valueOf(isCluster));
+
+ bind(Cluster.class);
+
+ bind(Assignment.class).to(AssignmentFromFile.class);
+
+ bind(Topology.class).to(TopologyFromFile.class);
+
+ /* Use a simple UDP comm layer implementation. */
+ bind(Listener.class).to(UDPListener.class);
+ bind(Emitter.class).to(UDPEmitter.class);
+
+ /* The hashing function to map keys top partitions. */
+ bind(Hasher.class).to(DefaultHasher.class);
+
+ /* Use Kryo to serialize events. */
+ bind(SerializerDeserializer.class).to(KryoSerDeser.class);
+
+ bind(Integer.class).annotatedWith(
+ Names.named("emitter.send.interval")).toInstance(
+ config.getInt("emitter.send.interval"));
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/0d111970/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
----------------------------------------------------------------------
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
deleted file mode 100644
index b052e70..0000000
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/UDPTestModule.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Copyright (c) 2011 Yahoo! Inc. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file.
- */
-package org.apache.s4.comm;
-
-import java.io.InputStream;
-
-import org.apache.commons.configuration.ConfigurationConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.s4.base.Emitter;
-import org.apache.s4.base.Hasher;
-import org.apache.s4.base.Listener;
-import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultHasher;
-import org.apache.s4.comm.serialize.KryoSerDeser;
-import org.apache.s4.comm.topology.Assignment;
-import org.apache.s4.comm.topology.AssignmentFromFile;
-import org.apache.s4.comm.topology.Cluster;
-import org.apache.s4.comm.topology.Topology;
-import org.apache.s4.comm.topology.TopologyFromFile;
-import org.apache.s4.comm.udp.UDPEmitter;
-import org.apache.s4.comm.udp.UDPListener;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Binder;
-import com.google.inject.name.Names;
-
-/*
- * Module for s4-comm/tests
- */
-public class UDPTestModule extends AbstractModule {
-
- protected PropertiesConfiguration config = null;
-
- private void loadProperties(Binder binder) {
-
- try {
- InputStream is = this.getClass().getResourceAsStream(
- "/s4-comm-test.properties");
- config = new PropertiesConfiguration();
- config.load(is);
-
- System.out.println(ConfigurationUtils.toString(config));
- Names.bindProperties(binder,
- ConfigurationConverter.getProperties(config));
- } catch (ConfigurationException e) {
- binder.addError(e);
- e.printStackTrace();
- }
- }
-
- @Override
- protected void configure() {
- if (config == null)
- loadProperties(binder());
-
- int numHosts = config.getList("cluster.hosts").size();
- boolean isCluster = numHosts > 1 ? true : false;
- bind(Boolean.class).annotatedWith(Names.named("isCluster")).toInstance(
- Boolean.valueOf(isCluster));
-
- bind(Cluster.class);
-
- bind(Assignment.class).to(AssignmentFromFile.class);
-
- bind(Topology.class).to(TopologyFromFile.class);
-
- /* Use a simple UDP comm layer implementation. */
- bind(Listener.class).to(UDPListener.class);
- bind(Emitter.class).to(UDPEmitter.class);
-
- /* The hashing function to map keys top partitions. */
- bind(Hasher.class).to(DefaultHasher.class);
-
- /* Use Kryo to serialize events. */
- bind(SerializerDeserializer.class).to(KryoSerDeser.class);
-
- bind(Integer.class).annotatedWith(Names.named("emitter.send.interval"))
- .toInstance(config.getInt("emitter.send.interval"));
-
- }
-}
\ No newline at end of file